Comparison

plugins/mod_mam/mod_mam.lua @ 9879:ddc07fb8dcd4 0.11

mod_mam: Perform message expiry based on building an index by date (backport of 39ee70fbb009 from trunk) For each day, store a set of users that have new messages. To expire messages, we collect the union of sets of users from dates that fall outside the cleanup range. The previous algoritm did not work well with many users, especially with the default settings.
author Kim Alvefur <zash@zash.se>
date Fri, 22 Mar 2019 17:32:56 +0100
parent 9555:ed5a5ddcef17
child 9882:18f025b3987d
child 10028:79ba2d709e72
comparison
equal deleted inserted replaced
9856:4be2af104bf0 9879:ddc07fb8dcd4
31 31
32 local is_stanza = st.is_stanza; 32 local is_stanza = st.is_stanza;
33 local tostring = tostring; 33 local tostring = tostring;
34 local time_now = os.time; 34 local time_now = os.time;
35 local m_min = math.min; 35 local m_min = math.min;
36 local timestamp, timestamp_parse = require "util.datetime".datetime, require "util.datetime".parse; 36 local timestamp, timestamp_parse, datestamp = import( "util.datetime", "datetime", "parse", "date");
37 local default_max_items, max_max_items = 20, module:get_option_number("max_archive_query_results", 50); 37 local default_max_items, max_max_items = 20, module:get_option_number("max_archive_query_results", 50);
38 local strip_tags = module:get_option_set("dont_archive_namespaces", { "http://jabber.org/protocol/chatstates" }); 38 local strip_tags = module:get_option_set("dont_archive_namespaces", { "http://jabber.org/protocol/chatstates" });
39 39
40 local archive_store = module:get_option_string("archive_store", "archive"); 40 local archive_store = module:get_option_string("archive_store", "archive");
41 local archive = module:open_store(archive_store, "archive"); 41 local archive = module:open_store(archive_store, "archive");
44 error("mod_"..(archive._provided_by or archive.name and "storage_"..archive.name).." does not support archiving\n" 44 error("mod_"..(archive._provided_by or archive.name and "storage_"..archive.name).." does not support archiving\n"
45 .."See https://prosody.im/doc/storage and https://prosody.im/doc/archiving for more information"); 45 .."See https://prosody.im/doc/storage and https://prosody.im/doc/archiving for more information");
46 end 46 end
47 local use_total = module:get_option_boolean("mam_include_total", true); 47 local use_total = module:get_option_boolean("mam_include_total", true);
48 48
49 local cleanup; 49 function schedule_cleanup()
50 50 -- replaced by non-noop later if cleanup is enabled
51 local function schedule_cleanup(username)
52 if cleanup and not cleanup[username] then
53 table.insert(cleanup, username);
54 cleanup[username] = true;
55 end
56 end 51 end
57 52
58 -- Handle prefs. 53 -- Handle prefs.
59 module:hook("iq/self/"..xmlns_mam..":prefs", function(event) 54 module:hook("iq/self/"..xmlns_mam..":prefs", function(event)
60 local origin, stanza = event.origin, event.stanza; 55 local origin, stanza = event.origin, event.stanza;
94 local origin, stanza = event.origin, event.stanza; 89 local origin, stanza = event.origin, event.stanza;
95 local query = stanza.tags[1]; 90 local query = stanza.tags[1];
96 local qid = query.attr.queryid; 91 local qid = query.attr.queryid;
97 92
98 get_prefs(origin.username, true); 93 get_prefs(origin.username, true);
99 schedule_cleanup(origin.username);
100 94
101 -- Search query parameters 95 -- Search query parameters
102 local qwith, qstart, qend; 96 local qwith, qstart, qend;
103 local form = query:get_child("x", "jabber:x:data"); 97 local form = query:get_child("x", "jabber:x:data");
104 if form then 98 if form then
210 end 204 end
211 205
212 local function shall_store(user, who) 206 local function shall_store(user, who)
213 -- TODO Cache this? 207 -- TODO Cache this?
214 if not um.user_exists(user, host) then 208 if not um.user_exists(user, host) then
209 module:log("debug", "%s@%s does not exist", user, host)
215 return false; 210 return false;
216 end 211 end
217 local prefs = get_prefs(user); 212 local prefs = get_prefs(user);
218 local rule = prefs[who]; 213 local rule = prefs[who];
219 module:log("debug", "%s's rule for %s is %s", user, who, tostring(rule)); 214 module:log("debug", "%s's rule for %s is %s", user, who, tostring(rule));
327 module:hook("pre-message/full", strip_stanza_id_after_other_events, -1); 322 module:hook("pre-message/full", strip_stanza_id_after_other_events, -1);
328 323
329 local cleanup_after = module:get_option_string("archive_expires_after", "1w"); 324 local cleanup_after = module:get_option_string("archive_expires_after", "1w");
330 local cleanup_interval = module:get_option_number("archive_cleanup_interval", 4 * 60 * 60); 325 local cleanup_interval = module:get_option_number("archive_cleanup_interval", 4 * 60 * 60);
331 if cleanup_after ~= "never" then 326 if cleanup_after ~= "never" then
327 local cleanup_storage = module:open_store("archive_cleanup");
328 local cleanup_map = module:open_store("archive_cleanup", "map");
329
332 local day = 86400; 330 local day = 86400;
333 local multipliers = { d = day, w = day * 7, m = 31 * day, y = 365.2425 * day }; 331 local multipliers = { d = day, w = day * 7, m = 31 * day, y = 365.2425 * day };
334 local n, m = cleanup_after:lower():match("(%d+)%s*([dwmy]?)"); 332 local n, m = cleanup_after:lower():match("(%d+)%s*([dwmy]?)");
335 if not n then 333 if not n then
336 module:log("error", "Could not parse archive_expires_after string %q", cleanup_after); 334 module:log("error", "Could not parse archive_expires_after string %q", cleanup_after);
344 if not archive.delete then 342 if not archive.delete then
345 module:log("error", "archive_expires_after set but mod_%s does not support deleting", archive._provided_by); 343 module:log("error", "archive_expires_after set but mod_%s does not support deleting", archive._provided_by);
346 return false; 344 return false;
347 end 345 end
348 346
349 -- Set of known users to do message expiry for 347 -- For each day, store a set of users that have new messages. To expire
350 -- Populated either below or when new messages are added 348 -- messages, we collect the union of sets of users from dates that fall
351 cleanup = {}; 349 -- outside the cleanup range.
352 350
353 -- Iterating over users is not supported by all authentication modules 351 function schedule_cleanup(username, date)
354 -- Catch and ignore error if not supported 352 cleanup_map:set(date or datestamp(), username, true);
355 pcall(function () 353 end
356 -- If this works, then we schedule cleanup for all known users on startup 354
357 for user in um.users(module.host) do 355 cleanup_runner = require "util.async".runner(function ()
358 schedule_cleanup(user); 356 local users = {};
359 end 357 local cut_off = datestamp(os.time() - cleanup_after);
358 for date in cleanup_storage:users() do
359 if date <= cut_off then
360 module:log("debug", "Messages from %q should be expired", date);
361 local messages_this_day = cleanup_storage:get(date);
362 if messages_this_day then
363 for user in pairs(messages_this_day) do
364 users[user] = true;
365 end
366 if date < cut_off then
367 -- Messages from the same day as the cut-off might not have expired yet,
368 -- but all earlier will have, so clear storage for those days.
369 cleanup_storage:set(date, nil);
370 end
371 end
372 end
373 end
374 local sum, num_users = 0, 0;
375 for user in pairs(users) do
376 local ok, err = archive:delete(user, { ["end"] = os.time() - cleanup_after; })
377 if ok then
378 num_users = num_users + 1;
379 sum = sum + (tonumber(ok) or 0);
380 end
381 end
382 module:log("info", "Deleted %d expired messages for %d users", sum, num_users);
360 end); 383 end);
361 384
362 -- At odd intervals, delete old messages for one user 385 cleanup_task = module:add_timer(1, function ()
363 module:add_timer(math.random(10, 60), function() 386 cleanup_runner:run(true);
364 local user = table.remove(cleanup, 1); 387 return cleanup_interval;
365 if user then
366 module:log("debug", "Removing old messages for user %q", user);
367 local ok, err = archive:delete(user, { ["end"] = os.time() - cleanup_after; })
368 if not ok then
369 module:log("warn", "Could not expire archives for user %s: %s", user, err);
370 elseif type(ok) == "number" then
371 module:log("debug", "Removed %d messages", ok);
372 end
373 cleanup[user] = nil;
374 end
375 return math.random(cleanup_interval, cleanup_interval * 2);
376 end); 388 end);
377 else 389 else
378 module:log("debug", "Archive expiry disabled"); 390 module:log("debug", "Archive expiry disabled");
379 -- Don't ask the backend to count the potentially unbounded number of items, 391 -- Don't ask the backend to count the potentially unbounded number of items,
380 -- it'll get slow. 392 -- it'll get slow.