Software /
code /
prosody
Comparison
plugins/mod_mam/mod_mam.lua @ 9751:39ee70fbb009
mod_mam: Perform message expiry based on building an index by date
For each day, store a set of users that have new messages. To expire
messages, we collect the union of sets of users from dates that fall
outside the cleanup range.
The previous algoritm did not work well with many users, especially with
the default settings.
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Thu, 03 Jan 2019 17:25:43 +0100 |
parent | 9555:ed5a5ddcef17 |
child | 9752:5cc2765c3ce4 |
comparison
equal
deleted
inserted
replaced
9750:65432dc80d90 | 9751:39ee70fbb009 |
---|---|
31 | 31 |
32 local is_stanza = st.is_stanza; | 32 local is_stanza = st.is_stanza; |
33 local tostring = tostring; | 33 local tostring = tostring; |
34 local time_now = os.time; | 34 local time_now = os.time; |
35 local m_min = math.min; | 35 local m_min = math.min; |
36 local timestamp, timestamp_parse = require "util.datetime".datetime, require "util.datetime".parse; | 36 local timestamp, timestamp_parse, datestamp = import( "util.datetime", "datetime", "parse", "date"); |
37 local default_max_items, max_max_items = 20, module:get_option_number("max_archive_query_results", 50); | 37 local default_max_items, max_max_items = 20, module:get_option_number("max_archive_query_results", 50); |
38 local strip_tags = module:get_option_set("dont_archive_namespaces", { "http://jabber.org/protocol/chatstates" }); | 38 local strip_tags = module:get_option_set("dont_archive_namespaces", { "http://jabber.org/protocol/chatstates" }); |
39 | 39 |
40 local archive_store = module:get_option_string("archive_store", "archive"); | 40 local archive_store = module:get_option_string("archive_store", "archive"); |
41 local archive = module:open_store(archive_store, "archive"); | 41 local archive = module:open_store(archive_store, "archive"); |
44 error("mod_"..(archive._provided_by or archive.name and "storage_"..archive.name).." does not support archiving\n" | 44 error("mod_"..(archive._provided_by or archive.name and "storage_"..archive.name).." does not support archiving\n" |
45 .."See https://prosody.im/doc/storage and https://prosody.im/doc/archiving for more information"); | 45 .."See https://prosody.im/doc/storage and https://prosody.im/doc/archiving for more information"); |
46 end | 46 end |
47 local use_total = module:get_option_boolean("mam_include_total", true); | 47 local use_total = module:get_option_boolean("mam_include_total", true); |
48 | 48 |
49 local cleanup; | 49 function schedule_cleanup() |
50 | 50 -- replaced by non-noop later if cleanup is enabled |
51 local function schedule_cleanup(username) | |
52 if cleanup and not cleanup[username] then | |
53 table.insert(cleanup, username); | |
54 cleanup[username] = true; | |
55 end | |
56 end | 51 end |
57 | 52 |
58 -- Handle prefs. | 53 -- Handle prefs. |
59 module:hook("iq/self/"..xmlns_mam..":prefs", function(event) | 54 module:hook("iq/self/"..xmlns_mam..":prefs", function(event) |
60 local origin, stanza = event.origin, event.stanza; | 55 local origin, stanza = event.origin, event.stanza; |
94 local origin, stanza = event.origin, event.stanza; | 89 local origin, stanza = event.origin, event.stanza; |
95 local query = stanza.tags[1]; | 90 local query = stanza.tags[1]; |
96 local qid = query.attr.queryid; | 91 local qid = query.attr.queryid; |
97 | 92 |
98 get_prefs(origin.username, true); | 93 get_prefs(origin.username, true); |
99 schedule_cleanup(origin.username); | |
100 | 94 |
101 -- Search query parameters | 95 -- Search query parameters |
102 local qwith, qstart, qend; | 96 local qwith, qstart, qend; |
103 local form = query:get_child("x", "jabber:x:data"); | 97 local form = query:get_child("x", "jabber:x:data"); |
104 if form then | 98 if form then |
210 end | 204 end |
211 | 205 |
212 local function shall_store(user, who) | 206 local function shall_store(user, who) |
213 -- TODO Cache this? | 207 -- TODO Cache this? |
214 if not um.user_exists(user, host) then | 208 if not um.user_exists(user, host) then |
209 module:log("debug", "%s@%s does not exist", user, host) | |
215 return false; | 210 return false; |
216 end | 211 end |
217 local prefs = get_prefs(user); | 212 local prefs = get_prefs(user); |
218 local rule = prefs[who]; | 213 local rule = prefs[who]; |
219 module:log("debug", "%s's rule for %s is %s", user, who, tostring(rule)); | 214 module:log("debug", "%s's rule for %s is %s", user, who, tostring(rule)); |
327 module:hook("pre-message/full", strip_stanza_id_after_other_events, -1); | 322 module:hook("pre-message/full", strip_stanza_id_after_other_events, -1); |
328 | 323 |
329 local cleanup_after = module:get_option_string("archive_expires_after", "1w"); | 324 local cleanup_after = module:get_option_string("archive_expires_after", "1w"); |
330 local cleanup_interval = module:get_option_number("archive_cleanup_interval", 4 * 60 * 60); | 325 local cleanup_interval = module:get_option_number("archive_cleanup_interval", 4 * 60 * 60); |
331 if cleanup_after ~= "never" then | 326 if cleanup_after ~= "never" then |
327 local cleanup_storage = module:open_store("archive_cleanup"); | |
328 local cleanup_map = module:open_store("archive_cleanup", "map"); | |
329 | |
332 local day = 86400; | 330 local day = 86400; |
333 local multipliers = { d = day, w = day * 7, m = 31 * day, y = 365.2425 * day }; | 331 local multipliers = { d = day, w = day * 7, m = 31 * day, y = 365.2425 * day }; |
334 local n, m = cleanup_after:lower():match("(%d+)%s*([dwmy]?)"); | 332 local n, m = cleanup_after:lower():match("(%d+)%s*([dwmy]?)"); |
335 if not n then | 333 if not n then |
336 module:log("error", "Could not parse archive_expires_after string %q", cleanup_after); | 334 module:log("error", "Could not parse archive_expires_after string %q", cleanup_after); |
344 if not archive.delete then | 342 if not archive.delete then |
345 module:log("error", "archive_expires_after set but mod_%s does not support deleting", archive._provided_by); | 343 module:log("error", "archive_expires_after set but mod_%s does not support deleting", archive._provided_by); |
346 return false; | 344 return false; |
347 end | 345 end |
348 | 346 |
349 -- Set of known users to do message expiry for | 347 -- For each day, store a set of users that have new messages. To expire |
350 -- Populated either below or when new messages are added | 348 -- messages, we collect the union of sets of users from dates that fall |
351 cleanup = {}; | 349 -- outside the cleanup range. |
352 | 350 |
353 -- Iterating over users is not supported by all authentication modules | 351 function schedule_cleanup(username, date) |
354 -- Catch and ignore error if not supported | 352 cleanup_map:set(date or datestamp(), username, true); |
355 pcall(function () | 353 end |
356 -- If this works, then we schedule cleanup for all known users on startup | 354 |
357 for user in um.users(module.host) do | 355 cleanup_runner = require "util.async".runner(function () |
358 schedule_cleanup(user); | 356 local users = {}; |
359 end | 357 local cut_off = datestamp(os.time() - cleanup_after); |
358 for date in cleanup_storage:users() do | |
359 if date < cut_off then | |
360 module:log("debug", "Messages from %q should be expired", date); | |
361 local messages_this_day = cleanup_storage:get(date); | |
362 if messages_this_day then | |
363 for user in pairs(messages_this_day) do | |
364 users[user] = true; | |
365 end | |
366 cleanup_storage:set(date, nil); | |
367 end | |
368 end | |
369 end | |
370 local sum, num_users = 0, 0; | |
371 for user in pairs(users) do | |
372 local ok, err = archive:delete(user, { ["end"] = os.time() - cleanup_after; }) | |
373 if ok then | |
374 num_users = num_users + 1; | |
375 sum = sum + tonumber(ok) or 0; | |
376 end | |
377 end | |
378 module:log("info", "Deleted expired %d messages for %d users", sum, num_users); | |
360 end); | 379 end); |
361 | 380 |
362 -- At odd intervals, delete old messages for one user | 381 cleanup_task = module:add_timer(1, function () |
363 module:add_timer(math.random(10, 60), function() | 382 cleanup_runner:run(true); |
364 local user = table.remove(cleanup, 1); | 383 return cleanup_interval; |
365 if user then | |
366 module:log("debug", "Removing old messages for user %q", user); | |
367 local ok, err = archive:delete(user, { ["end"] = os.time() - cleanup_after; }) | |
368 if not ok then | |
369 module:log("warn", "Could not expire archives for user %s: %s", user, err); | |
370 elseif type(ok) == "number" then | |
371 module:log("debug", "Removed %d messages", ok); | |
372 end | |
373 cleanup[user] = nil; | |
374 end | |
375 return math.random(cleanup_interval, cleanup_interval * 2); | |
376 end); | 384 end); |
377 else | 385 else |
378 module:log("debug", "Archive expiry disabled"); | 386 module:log("debug", "Archive expiry disabled"); |
379 -- Don't ask the backend to count the potentially unbounded number of items, | 387 -- Don't ask the backend to count the potentially unbounded number of items, |
380 -- it'll get slow. | 388 -- it'll get slow. |