File

plugins/mod_storage_internal.lua @ 13235:dbd7a6b09ada

util.datamanager: Close file handle when done using it It gets closed eventually but at high load they could potentially lead to reaching FD limits faster.
author Kim Alvefur <zash@zash.se>
date Fri, 21 Jul 2023 18:28:54 +0200
parent 13228:616c578c644f
child 13262:9a86e7cbdd79
line wrap: on
line source

local cache = require "prosody.util.cache";
local datamanager = require "prosody.core.storagemanager".olddm;
local array = require "prosody.util.array";
local datetime = require "prosody.util.datetime";
local st = require "prosody.util.stanza";
local now = require "prosody.util.time".now;
local id = require "prosody.util.id".medium;
local jid_join = require "prosody.util.jid".join;
local set = require "prosody.util.set";
local it = require "prosody.util.iterators";

local host = module.host;

local archive_item_limit = module:get_option_integer("storage_archive_item_limit", 10000, 0);
local archive_item_count_cache = cache.new(module:get_option_integer("storage_archive_item_limit_cache_size", 1000, 1));

local use_shift = module:get_option_boolean("storage_archive_experimental_fast_delete", false);

local driver = {};

function driver:open(store, typ)
	local mt = self[typ or "keyval"]
	if not mt then
		return nil, "unsupported-store";
	end
	return setmetatable({ store = store, type = typ }, mt);
end

function driver:stores(username) -- luacheck: ignore 212/self
	return datamanager.stores(username, host);
end

function driver:purge(user) -- luacheck: ignore 212/self
	return datamanager.purge(user, host);
end

local keyval = { };
driver.keyval = { __index = keyval };

function keyval:get(user)
	return datamanager.load(user, host, self.store);
end

function keyval:set(user, data)
	return datamanager.store(user, host, self.store, data);
end

function keyval:users()
	return datamanager.users(host, self.store, self.type);
end

local archive = {};
driver.archive = { __index = archive };

archive.caps = {
	total = true;
	quota = archive_item_limit;
	truncate = true;
	full_id_range = true;
	ids = true;
};

function archive:append(username, key, value, when, with)
	when = when or now();
	if not st.is_stanza(value) then
		return nil, "unsupported-datatype";
	end
	value = st.preserialize(st.clone(value));
	value.when = when;
	value.with = with;
	value.attr.stamp = datetime.datetime(when);

	local cache_key = jid_join(username, host, self.store);
	local item_count = archive_item_count_cache:get(cache_key);

	if key then
		local items, err = datamanager.list_load(username, host, self.store);
		if not items and err then return items, err; end

		-- Check the quota
		item_count = items and #items or 0;
		archive_item_count_cache:set(cache_key, item_count);
		if item_count >= archive_item_limit then
			module:log("debug", "%s reached or over quota, not adding to store", username);
			return nil, "quota-limit";
		end

		if items then
			-- Filter out any item with the same key as the one being added
			items = array(items);
			items:filter(function (item)
				return item.key ~= key;
			end);

			value.key = key;
			items:push(value);
			local ok, err = datamanager.list_store(username, host, self.store, items);
			if not ok then return ok, err; end
			archive_item_count_cache:set(cache_key, #items);
			return key;
		end
	else
		if not item_count then -- Item count not cached?
			-- We need to load the list to get the number of items currently stored
			local items, err = datamanager.list_load(username, host, self.store);
			if not items and err then return items, err; end
			item_count = items and #items or 0;
			archive_item_count_cache:set(cache_key, item_count);
		end
		if item_count >= archive_item_limit then
			module:log("debug", "%s reached or over quota, not adding to store", username);
			return nil, "quota-limit";
		end
		key = id();
	end

	module:log("debug", "%s has %d items out of %d limit in store %s", username, item_count, archive_item_limit, self.store);

	value.key = key;

	local ok, err = datamanager.list_append(username, host, self.store, value);
	if not ok then return ok, err; end
	archive_item_count_cache:set(cache_key, item_count+1);
	return key;
end

local function binary_search(haystack, test, min, max)
	if min == nil then
		min = 1;
	end
	if max == nil then
		max = #haystack;
	end

	local floor = math.floor;
	while min < max do
		local mid = floor((max + min) / 2);

		local result = test(haystack[mid]);
		if result < 0 then
			max = mid;
		elseif result > 0 then
			min = mid + 1;
		else
			return mid, haystack[mid];
		end
	end

	return min, nil;
end

function archive:find(username, query)
	local list, err = datamanager.list_open(username, host, self.store);
	if not list then
		if err then
			return list, err;
		elseif query then
			if query.before or query.after then
				return nil, "item-not-found";
			end
			if query.total then
				return function()
				end, 0;
			end
		end
		return function()
		end;
	end

	local i = 0;
	local iter = function()
		i = i + 1;
		return list[i]
	end

	if query then
		if query.reverse then
			i = #list + 1
			iter = function()
				i = i - 1
				return list[i]
			end
		end
		if query.key then
			iter = it.filter(function(item)
				return item.key == query.key;
			end, iter);
		end
		if query.ids then
			local ids = set.new(query.ids);
			iter = it.filter(function(item)
				return ids:contains(item.key);
			end, iter);
		end
		if query.with then
			iter = it.filter(function(item)
				return item.with == query.with;
			end, iter);
		end
		if query.start then
			if not query.reverse then
				local wi, exact = binary_search(list, function(item)
					local when = item.when or datetime.parse(item.attr.stamp);
					return query.start - when;
				end);
				if exact then
					i = wi - 1;
				elseif wi then
					i = wi;
				end
			else
				iter = it.filter(function(item)
					local when = item.when or datetime.parse(item.attr.stamp);
					return when >= query.start;
				end, iter);
			end
		end
		if query["end"] then
			if query.reverse then
				local wi = binary_search(list, function(item)
					local when = item.when or datetime.parse(item.attr.stamp);
					return query["end"] - when;
				end);
				if wi then
					i = wi + 1;
				end
			else
				iter = it.filter(function(item)
					local when = item.when or datetime.parse(item.attr.stamp);
					return when <= query["end"];
				end, iter);
			end
		end
		if query.after then
			local found = false;
			iter = it.filter(function(item)
				local found_after = found;
				if item.key == query.after then
					found = true
				end
				return found_after;
			end, iter);
		end
		if query.before then
			local found = false;
			iter = it.filter(function(item)
				if item.key == query.before then
					found = true
				end
				return not found;
			end, iter);
		end
		if query.limit then
			iter = it.head(query.limit, iter);
		end
	end

	return function()
		local item = iter();
		if item == nil then
			return
		end
		local key = item.key;
		local when = item.when or item.attr and datetime.parse(item.attr.stamp);
		local with = item.with;
		item.key, item.when, item.with = nil, nil, nil;
		item.attr.stamp = nil;
		-- COMPAT Stored data may still contain legacy XEP-0091 timestamp
		item.attr.stamp_legacy = nil;
		item = st.deserialize(item);
		return key, item, when, with;
	end
end

function archive:get(username, wanted_key)
	local iter, err = self:find(username, { key = wanted_key })
	if not iter then return iter, err; end
	for key, stanza, when, with in iter do
		if key == wanted_key then
			return stanza, when, with;
		end
	end
	return nil, "item-not-found";
end

function archive:set(username, key, new_value, new_when, new_with)
	local items, err = datamanager.list_load(username, host, self.store);
	if not items then
		if err then
			return items, err;
		else
			return nil, "item-not-found";
		end
	end

	for i = 1, #items do
		local old_item = items[i];
		if old_item.key == key then
			local item = st.preserialize(st.clone(new_value));

			local when = new_when or old_item.when or datetime.parse(old_item.attr.stamp);
			item.key = key;
			item.when = when;
			item.with = new_with or old_item.with;
			item.attr.stamp = datetime.datetime(when);
			items[i] = item;
			return datamanager.list_store(username, host, self.store, items);
		end
	end

	return nil, "item-not-found";
end

function archive:dates(username)
	local items, err = datamanager.list_load(username, host, self.store);
	if not items then return items, err; end
	return array(items):pluck("when"):map(datetime.date):unique();
end

function archive:summary(username, query)
	local iter, err = self:find(username, query)
	if not iter then return iter, err; end
	local counts = {};
	local earliest = {};
	local latest = {};
	local body = {};
	for _, stanza, when, with in iter do
		counts[with] = (counts[with] or 0) + 1;
		if earliest[with] == nil then
			earliest[with] = when;
		end
		latest[with] = when;
		body[with] = stanza:get_child_text("body") or body[with];
	end
	return {
		counts = counts;
		earliest = earliest;
		latest = latest;
		body = body;
	};
end

function archive:users()
	return datamanager.users(host, self.store, "list");
end

function archive:trim(username, to_when)
	local list, err = datamanager.list_open(username, host, self.store);
	if not list then return list,err;end
	-- luacheck: ignore 211/exact
	local i, exact = binary_search(list, function(item)
		local when = item.when or datetime.parse(item.attr.stamp);
		return to_when - when;
	end);
	-- TODO if exact then ... off by one?
	if i == 1 then return 0; end
	local ok, err = datamanager.list_shift(username, host, self.store, i);
	if not ok then return ok, err; end
	return i-1;
end

function archive:delete(username, query)
	local cache_key = jid_join(username, host, self.store);
	if not query or next(query) == nil then
		archive_item_count_cache:set(cache_key, nil);
		return datamanager.list_store(username, host, self.store, nil);
	end

	if use_shift and next(query) == "end" and next(query, "end") == nil then
		return self:trim(username, query["end"]);
	end

	local items, err = datamanager.list_load(username, host, self.store);
	if not items then
		if err then
			return items, err;
		end
		archive_item_count_cache:set(cache_key, 0);
		-- Store is empty
		return 0;
	end
	items = array(items);
	local count_before = #items;
	if query then
		if query.key then
			items:filter(function (item)
				return item.key ~= query.key;
			end);
		end
		if query.with then
			items:filter(function (item)
				return item.with ~= query.with;
			end);
		end
		if query.start then
			items:filter(function (item)
				return item.when < query.start;
			end);
		end
		if query["end"] then
			items:filter(function (item)
				return item.when > query["end"];
			end);
		end
		if query.truncate and #items > query.truncate then
			if query.reverse then
				-- Before: { 1, 2, 3, 4, 5, }
				-- After: { 1, 2, 3 }
				for i = #items, query.truncate + 1, -1 do
					items[i] = nil;
				end
			else
				-- Before: { 1, 2, 3, 4, 5, }
				-- After: { 3, 4, 5 }
				local offset = #items - query.truncate;
				for i = 1, #items do
					items[i] = items[i+offset];
				end
			end
		end
	end
	local count = count_before - #items;
	if count == 0 then
		return 0; -- No changes, skip write
	end
	local ok, err = datamanager.list_store(username, host, self.store, items);
	if not ok then return ok, err; end
	archive_item_count_cache:set(cache_key, #items);
	return count;
end

module:provides("storage", driver);