Software /
code /
prosody
File
plugins/mod_storage_internal.lua @ 13235:dbd7a6b09ada
util.datamanager: Close file handle when done using it
It gets closed eventually but at high load they could potentially
lead to reaching FD limits faster.
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Fri, 21 Jul 2023 18:28:54 +0200 |
parent | 13228:616c578c644f |
child | 13262:9a86e7cbdd79 |
line wrap: on
line source
local cache = require "prosody.util.cache"; local datamanager = require "prosody.core.storagemanager".olddm; local array = require "prosody.util.array"; local datetime = require "prosody.util.datetime"; local st = require "prosody.util.stanza"; local now = require "prosody.util.time".now; local id = require "prosody.util.id".medium; local jid_join = require "prosody.util.jid".join; local set = require "prosody.util.set"; local it = require "prosody.util.iterators"; local host = module.host; local archive_item_limit = module:get_option_integer("storage_archive_item_limit", 10000, 0); local archive_item_count_cache = cache.new(module:get_option_integer("storage_archive_item_limit_cache_size", 1000, 1)); local use_shift = module:get_option_boolean("storage_archive_experimental_fast_delete", false); local driver = {}; function driver:open(store, typ) local mt = self[typ or "keyval"] if not mt then return nil, "unsupported-store"; end return setmetatable({ store = store, type = typ }, mt); end function driver:stores(username) -- luacheck: ignore 212/self return datamanager.stores(username, host); end function driver:purge(user) -- luacheck: ignore 212/self return datamanager.purge(user, host); end local keyval = { }; driver.keyval = { __index = keyval }; function keyval:get(user) return datamanager.load(user, host, self.store); end function keyval:set(user, data) return datamanager.store(user, host, self.store, data); end function keyval:users() return datamanager.users(host, self.store, self.type); end local archive = {}; driver.archive = { __index = archive }; archive.caps = { total = true; quota = archive_item_limit; truncate = true; full_id_range = true; ids = true; }; function archive:append(username, key, value, when, with) when = when or now(); if not st.is_stanza(value) then return nil, "unsupported-datatype"; end value = st.preserialize(st.clone(value)); value.when = when; value.with = with; value.attr.stamp = datetime.datetime(when); local cache_key = jid_join(username, host, self.store); local item_count = archive_item_count_cache:get(cache_key); if key then local items, err = datamanager.list_load(username, host, self.store); if not items and err then return items, err; end -- Check the quota item_count = items and #items or 0; archive_item_count_cache:set(cache_key, item_count); if item_count >= archive_item_limit then module:log("debug", "%s reached or over quota, not adding to store", username); return nil, "quota-limit"; end if items then -- Filter out any item with the same key as the one being added items = array(items); items:filter(function (item) return item.key ~= key; end); value.key = key; items:push(value); local ok, err = datamanager.list_store(username, host, self.store, items); if not ok then return ok, err; end archive_item_count_cache:set(cache_key, #items); return key; end else if not item_count then -- Item count not cached? -- We need to load the list to get the number of items currently stored local items, err = datamanager.list_load(username, host, self.store); if not items and err then return items, err; end item_count = items and #items or 0; archive_item_count_cache:set(cache_key, item_count); end if item_count >= archive_item_limit then module:log("debug", "%s reached or over quota, not adding to store", username); return nil, "quota-limit"; end key = id(); end module:log("debug", "%s has %d items out of %d limit in store %s", username, item_count, archive_item_limit, self.store); value.key = key; local ok, err = datamanager.list_append(username, host, self.store, value); if not ok then return ok, err; end archive_item_count_cache:set(cache_key, item_count+1); return key; end local function binary_search(haystack, test, min, max) if min == nil then min = 1; end if max == nil then max = #haystack; end local floor = math.floor; while min < max do local mid = floor((max + min) / 2); local result = test(haystack[mid]); if result < 0 then max = mid; elseif result > 0 then min = mid + 1; else return mid, haystack[mid]; end end return min, nil; end function archive:find(username, query) local list, err = datamanager.list_open(username, host, self.store); if not list then if err then return list, err; elseif query then if query.before or query.after then return nil, "item-not-found"; end if query.total then return function() end, 0; end end return function() end; end local i = 0; local iter = function() i = i + 1; return list[i] end if query then if query.reverse then i = #list + 1 iter = function() i = i - 1 return list[i] end end if query.key then iter = it.filter(function(item) return item.key == query.key; end, iter); end if query.ids then local ids = set.new(query.ids); iter = it.filter(function(item) return ids:contains(item.key); end, iter); end if query.with then iter = it.filter(function(item) return item.with == query.with; end, iter); end if query.start then if not query.reverse then local wi, exact = binary_search(list, function(item) local when = item.when or datetime.parse(item.attr.stamp); return query.start - when; end); if exact then i = wi - 1; elseif wi then i = wi; end else iter = it.filter(function(item) local when = item.when or datetime.parse(item.attr.stamp); return when >= query.start; end, iter); end end if query["end"] then if query.reverse then local wi = binary_search(list, function(item) local when = item.when or datetime.parse(item.attr.stamp); return query["end"] - when; end); if wi then i = wi + 1; end else iter = it.filter(function(item) local when = item.when or datetime.parse(item.attr.stamp); return when <= query["end"]; end, iter); end end if query.after then local found = false; iter = it.filter(function(item) local found_after = found; if item.key == query.after then found = true end return found_after; end, iter); end if query.before then local found = false; iter = it.filter(function(item) if item.key == query.before then found = true end return not found; end, iter); end if query.limit then iter = it.head(query.limit, iter); end end return function() local item = iter(); if item == nil then return end local key = item.key; local when = item.when or item.attr and datetime.parse(item.attr.stamp); local with = item.with; item.key, item.when, item.with = nil, nil, nil; item.attr.stamp = nil; -- COMPAT Stored data may still contain legacy XEP-0091 timestamp item.attr.stamp_legacy = nil; item = st.deserialize(item); return key, item, when, with; end end function archive:get(username, wanted_key) local iter, err = self:find(username, { key = wanted_key }) if not iter then return iter, err; end for key, stanza, when, with in iter do if key == wanted_key then return stanza, when, with; end end return nil, "item-not-found"; end function archive:set(username, key, new_value, new_when, new_with) local items, err = datamanager.list_load(username, host, self.store); if not items then if err then return items, err; else return nil, "item-not-found"; end end for i = 1, #items do local old_item = items[i]; if old_item.key == key then local item = st.preserialize(st.clone(new_value)); local when = new_when or old_item.when or datetime.parse(old_item.attr.stamp); item.key = key; item.when = when; item.with = new_with or old_item.with; item.attr.stamp = datetime.datetime(when); items[i] = item; return datamanager.list_store(username, host, self.store, items); end end return nil, "item-not-found"; end function archive:dates(username) local items, err = datamanager.list_load(username, host, self.store); if not items then return items, err; end return array(items):pluck("when"):map(datetime.date):unique(); end function archive:summary(username, query) local iter, err = self:find(username, query) if not iter then return iter, err; end local counts = {}; local earliest = {}; local latest = {}; local body = {}; for _, stanza, when, with in iter do counts[with] = (counts[with] or 0) + 1; if earliest[with] == nil then earliest[with] = when; end latest[with] = when; body[with] = stanza:get_child_text("body") or body[with]; end return { counts = counts; earliest = earliest; latest = latest; body = body; }; end function archive:users() return datamanager.users(host, self.store, "list"); end function archive:trim(username, to_when) local list, err = datamanager.list_open(username, host, self.store); if not list then return list,err;end -- luacheck: ignore 211/exact local i, exact = binary_search(list, function(item) local when = item.when or datetime.parse(item.attr.stamp); return to_when - when; end); -- TODO if exact then ... off by one? if i == 1 then return 0; end local ok, err = datamanager.list_shift(username, host, self.store, i); if not ok then return ok, err; end return i-1; end function archive:delete(username, query) local cache_key = jid_join(username, host, self.store); if not query or next(query) == nil then archive_item_count_cache:set(cache_key, nil); return datamanager.list_store(username, host, self.store, nil); end if use_shift and next(query) == "end" and next(query, "end") == nil then return self:trim(username, query["end"]); end local items, err = datamanager.list_load(username, host, self.store); if not items then if err then return items, err; end archive_item_count_cache:set(cache_key, 0); -- Store is empty return 0; end items = array(items); local count_before = #items; if query then if query.key then items:filter(function (item) return item.key ~= query.key; end); end if query.with then items:filter(function (item) return item.with ~= query.with; end); end if query.start then items:filter(function (item) return item.when < query.start; end); end if query["end"] then items:filter(function (item) return item.when > query["end"]; end); end if query.truncate and #items > query.truncate then if query.reverse then -- Before: { 1, 2, 3, 4, 5, } -- After: { 1, 2, 3 } for i = #items, query.truncate + 1, -1 do items[i] = nil; end else -- Before: { 1, 2, 3, 4, 5, } -- After: { 3, 4, 5 } local offset = #items - query.truncate; for i = 1, #items do items[i] = items[i+offset]; end end end end local count = count_before - #items; if count == 0 then return 0; -- No changes, skip write end local ok, err = datamanager.list_store(username, host, self.store, items); if not ok then return ok, err; end archive_item_count_cache:set(cache_key, #items); return count; end module:provides("storage", driver);