Changeset

9883:f76bd399267c

mod_storage_internal,_sql: Add limit to number of items in an archive store (fixes #733)
author Matthew Wild <mwild1@gmail.com>
date Fri, 20 Oct 2017 12:53:53 +0200
parents 9882:18f025b3987d
children 9884:9751c17f5281
files plugins/mod_storage_internal.lua plugins/mod_storage_sql.lua
diffstat 2 files changed, 73 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- a/plugins/mod_storage_internal.lua	Fri Mar 22 17:58:08 2019 +0100
+++ b/plugins/mod_storage_internal.lua	Fri Oct 20 12:53:53 2017 +0200
@@ -1,3 +1,4 @@
+local cache = require "util.cache";
 local datamanager = require "core.storagemanager".olddm;
 local array = require "util.array";
 local datetime = require "util.datetime";
@@ -7,6 +8,9 @@
 
 local host = module.host;
 
+local archive_item_limit = module:get_option_number("storage_archive_item_limit", 1000);
+local archive_item_count_cache = cache.new(module:get_option("storage_archive_item_limit_cache_size", 1000));
+
 local driver = {};
 
 function driver:open(store, typ)
@@ -54,28 +58,56 @@
 	value.attr.stamp = datetime.datetime(when);
 	value.attr.stamp_legacy = datetime.legacy(when);
 
+	local item_count = archive_item_count_cache:get(username);
+
 	if key then
 		local items, err = datamanager.list_load(username, host, self.store);
 		if not items and err then return items, err; end
+
+		-- Check the quota
+		item_count = items and #items or 0;
+		archive_item_count_cache:set(username, item_count);
+		if item_count >= archive_item_limit then
+			module:log("debug", "%s reached or over quota, not adding to store", username);
+			return nil, "quota-limit";
+		end
+
 		if items then
+			-- Filter out any item with the same key as the one being added
 			items = array(items);
 			items:filter(function (item)
 				return item.key ~= key;
 			end);
+
 			value.key = key;
 			items:push(value);
 			local ok, err = datamanager.list_store(username, host, self.store, items);
 			if not ok then return ok, err; end
+			archive_item_count_cache:set(username, #items);
 			return key;
 		end
 	else
+		if not item_count then -- Item count not cached?
+			-- We need to load the list to get the number of items currently stored
+			local items, err = datamanager.list_load(username, host, self.store);
+			if not items and err then return items, err; end
+			item_count = items and #items or 0;
+			archive_item_count_cache:set(username, item_count);
+		end
+		if item_count >= archive_item_limit then
+			module:log("debug", "%s reached or over quota, not adding to store", username);
+			return nil, "quota-limit";
+		end
 		key = id();
 	end
 
+	module:log("debug", "%s has %d items out of %d limit", username, item_count, archive_item_limit);
+
 	value.key = key;
 
 	local ok, err = datamanager.list_append(username, host, self.store, value);
 	if not ok then return ok, err; end
+	archive_item_count_cache:set(username, item_count+1);
 	return key;
 end
 
@@ -158,6 +190,7 @@
 
 function archive:delete(username, query)
 	if not query or next(query) == nil then
+		archive_item_count_cache:set(username, nil);
 		return datamanager.list_store(username, host, self.store, nil);
 	end
 	local items, err = datamanager.list_load(username, host, self.store);
@@ -165,6 +198,7 @@
 		if err then
 			return items, err;
 		end
+		archive_item_count_cache:set(username, 0);
 		-- Store is empty
 		return 0;
 	end
@@ -214,6 +248,7 @@
 	end
 	local ok, err = datamanager.list_store(username, host, self.store, items);
 	if not ok then return ok, err; end
+	archive_item_count_cache:set(username, #items);
 	return count;
 end
 
--- a/plugins/mod_storage_sql.lua	Fri Mar 22 17:58:08 2019 +0100
+++ b/plugins/mod_storage_sql.lua	Fri Oct 20 12:53:53 2017 +0200
@@ -1,6 +1,7 @@
 
 -- luacheck: ignore 212/self
 
+local cache = require "util.cache";
 local json = require "util.json";
 local sql = require "util.sql";
 local xml_parse = require "util.xml".parse;
@@ -148,6 +149,9 @@
 
 --- Archive store API
 
+local archive_item_limit = module:get_option_number("storage_archive_item_limit", 1000);
+local archive_item_count_cache = cache.new(module:get_option("storage_archive_item_limit_cache_size", 1000));
+
 -- luacheck: ignore 512 431/user 431/store
 local map_store = {};
 map_store.__index = map_store;
@@ -231,6 +235,32 @@
 };
 archive_store.__index = archive_store
 function archive_store:append(username, key, value, when, with)
+	local item_count = archive_item_count_cache:get(username);
+	if not item_count then
+		local ok, ret = engine:transaction(function()
+			local count_sql = [[
+			SELECT COUNT(*) FROM "prosodyarchive"
+			WHERE "host"=? AND "user"=? AND "store"=?;
+			]];
+			local result = engine:select(count_sql, host, user, store);
+			if result then
+				for row in result do
+					item_count = row[1];
+				end
+			end
+		end);
+		if not ok or not item_count then
+			module:log("error", "Failed while checking quota for %s: %s", username, ret);
+			return nil, "Failure while checking quota";
+		end
+		archive_item_count_cache:set(username, item_count);
+	end
+
+	module:log("debug", "%s has %d items out of %d limit", username, item_count, archive_item_limit);
+	if item_count >= archive_item_limit then
+		return nil, "quota-limit";
+	end
+
 	local user,store = username,self.store;
 	when = when or os.time();
 	with = with or "";
@@ -245,12 +275,18 @@
 		VALUES (?,?,?,?,?,?,?,?);
 		]];
 		if key then
-			engine:delete(delete_sql, host, user or "", store, key);
+			local result, err = engine:delete(delete_sql, host, user or "", store, key);
+			if result then
+				item_count = item_count - result:affected();
+				archive_item_count_cache:set(username, item_count);
+			end
 		else
+			item_count = item_count + 1;
 			key = uuid.generate();
 		end
 		local t, encoded_value = assert(serialize(value));
 		engine:insert(insert_sql, host, user or "", store, when, with, key, t, encoded_value);
+		archive_item_count_cache:set(username, item_count+1);
 		return key;
 	end);
 	if not ok then return ok, ret; end
@@ -422,6 +458,7 @@
 		end
 		return engine:delete(sql_query, unpack(args));
 	end);
+	archive_item_count_cache:set(username, nil);
 	return ok and stmt:affected(), stmt;
 end