Software /
code /
prosody
Comparison
plugins/mod_smacks.lua @ 12053:03e9587fbfd2
mod_smacks: Switch storage for tracking resumption tokens
All that was a complicated way to limit the number of resumable
sessions. Let's control resource usage some other way. This leaves the
essence of mapping resumption tokens to live sessions.
This keeps resumption state across reloads.
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Thu, 02 Dec 2021 14:41:19 +0100 |
parent | 12052:d17b8fcf11c7 |
child | 12054:0116fa57f05c |
comparison
equal
deleted
inserted
replaced
12052:d17b8fcf11c7 | 12053:03e9587fbfd2 |
---|---|
16 local math_max = math.max; | 16 local math_max = math.max; |
17 local math_min = math.min; | 17 local math_min = math.min; |
18 local os_time = os.time; | 18 local os_time = os.time; |
19 local t_remove = table.remove; | 19 local t_remove = table.remove; |
20 | 20 |
21 local cache = require "util.cache"; | |
22 local datetime = require "util.datetime"; | 21 local datetime = require "util.datetime"; |
23 local add_filter = require "util.filters".add_filter; | 22 local add_filter = require "util.filters".add_filter; |
24 local jid = require "util.jid"; | 23 local jid = require "util.jid"; |
25 local st = require "util.stanza"; | 24 local st = require "util.stanza"; |
26 local timer = require "util.timer"; | 25 local timer = require "util.timer"; |
42 local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", true); | 41 local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", true); |
43 local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false); | 42 local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false); |
44 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0); | 43 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0); |
45 local max_inactive_unacked_stanzas = module:get_option_number("smacks_max_inactive_unacked_stanzas", 256); | 44 local max_inactive_unacked_stanzas = module:get_option_number("smacks_max_inactive_unacked_stanzas", 256); |
46 local delayed_ack_timeout = module:get_option_number("smacks_max_ack_delay", 30); | 45 local delayed_ack_timeout = module:get_option_number("smacks_max_ack_delay", 30); |
47 local max_hibernated_sessions = module:get_option_number("smacks_max_hibernated_sessions", 10); | |
48 | |
49 assert(max_hibernated_sessions > 0, "smacks_max_hibernated_sessions must be greater than 0"); | |
50 | 46 |
51 local c2s_sessions = module:shared("/*/c2s/sessions"); | 47 local c2s_sessions = module:shared("/*/c2s/sessions"); |
52 | 48 |
53 local function format_h(h) if h then return string.format("%d", h) end end | 49 local function format_h(h) if h then return string.format("%d", h) end end |
54 | 50 |
55 local function init_session_cache(max_entries, evict_callback) | |
56 -- use per user limited cache for prosody >= 0.10 | |
57 local stores = {}; | |
58 return { | |
59 get = function(user, key) | |
60 if not user then return nil; end | |
61 if not key then return nil; end | |
62 if not stores[user] then | |
63 stores[user] = cache.new(max_entries, evict_callback); | |
64 end | |
65 return stores[user]:get(key); | |
66 end; | |
67 set = function(user, key, value) | |
68 if not user then return nil; end | |
69 if not key then return nil; end | |
70 if not stores[user] then stores[user] = cache.new(max_entries, evict_callback); end | |
71 stores[user]:set(key, value); | |
72 -- remove empty caches completely | |
73 if stores[user]:count() == 0 then stores[user] = nil; end | |
74 end; | |
75 }; | |
76 end | |
77 local old_session_registry = module:open_store("smacks_h", "map"); | 51 local old_session_registry = module:open_store("smacks_h", "map"); |
78 local session_registry = init_session_cache(max_hibernated_sessions, function(resumption_token, session) | 52 local session_registry = module:shared "/*/smacks/resumption-tokens"; -- > user@host/resumption-token --> resource |
79 if session.destroyed then return true; end -- destroyed session can always be removed from cache | |
80 session.log("warn", "User has too much hibernated sessions, removing oldest session (token: %s)", resumption_token); | |
81 -- store old session's h values on force delete | |
82 -- save only actual h value and username/host (for security) | |
83 old_session_registry:set(session.username, resumption_token, { | |
84 h = session.handled_stanza_count, | |
85 }); | |
86 return true; -- allow session to be removed from full cache to make room for new one | |
87 end); | |
88 | 53 |
89 local function ack_delayed(session, stanza) | 54 local function ack_delayed(session, stanza) |
90 -- fire event only if configured to do so and our session is not already hibernated or destroyed | 55 -- fire event only if configured to do so and our session is not already hibernated or destroyed |
91 if delayed_ack_timeout > 0 and session.awaiting_ack | 56 if delayed_ack_timeout > 0 and session.awaiting_ack |
92 and not session.hibernating and not session.destroyed then | 57 and not session.hibernating and not session.destroyed then |
243 end | 208 end |
244 | 209 |
245 module:hook("pre-session-close", function(event) | 210 module:hook("pre-session-close", function(event) |
246 local session = event.session; | 211 local session = event.session; |
247 if session.resumption_token then | 212 if session.resumption_token then |
248 session_registry.set(session.username, session.resumption_token, nil); | 213 session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil; |
249 old_session_registry:set(session.username, session.resumption_token, nil); | 214 old_session_registry:set(session.username, session.resumption_token, nil); |
250 session.resumption_token = nil; | 215 session.resumption_token = nil; |
251 end | 216 end |
252 -- send out last ack as per revision 1.5.2 of XEP-0198 | 217 -- send out last ack as per revision 1.5.2 of XEP-0198 |
253 if session.smacks and session.conn and session.handled_stanza_count then | 218 if session.smacks and session.conn and session.handled_stanza_count then |
288 | 253 |
289 local resume_token; | 254 local resume_token; |
290 local resume = stanza.attr.resume; | 255 local resume = stanza.attr.resume; |
291 if resume == "true" or resume == "1" then | 256 if resume == "true" or resume == "1" then |
292 resume_token = uuid_generate(); | 257 resume_token = uuid_generate(); |
293 session_registry.set(session.username, resume_token, session); | 258 session_registry[jid.join(session.username, session.host, resume_token)] = session; |
294 session.resumption_token = resume_token; | 259 session.resumption_token = resume_token; |
295 end | 260 end |
296 (session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume, max = tostring(resume_timeout) })); | 261 (session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume, max = tostring(resume_timeout) })); |
297 return true; | 262 return true; |
298 end | 263 end |
487 session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds", | 452 session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds", |
488 resume_timeout - (current_time - timeout_start)); | 453 resume_timeout - (current_time - timeout_start)); |
489 return resume_timeout-(current_time-timeout_start); -- time left to wait | 454 return resume_timeout-(current_time-timeout_start); -- time left to wait |
490 end | 455 end |
491 session.log("debug", "Destroying session for hibernating too long"); | 456 session.log("debug", "Destroying session for hibernating too long"); |
492 session_registry.set(session.username, session.resumption_token, nil); | 457 session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil; |
493 -- save only actual h value and username/host (for security) | 458 -- save only actual h value and username/host (for security) |
494 old_session_registry:set(session.username, session.resumption_token, { | 459 old_session_registry:set(session.username, session.resumption_token, { |
495 h = session.handled_stanza_count, | 460 h = session.handled_stanza_count, |
496 }); | 461 }); |
497 session.resumption_token = nil; | 462 session.resumption_token = nil; |
539 ); | 504 ); |
540 return true; | 505 return true; |
541 end | 506 end |
542 | 507 |
543 local id = stanza.attr.previd; | 508 local id = stanza.attr.previd; |
544 local original_session = session_registry.get(session.username, id); | 509 local original_session = session_registry[jid.join(session.username, session.host, id)]; |
545 if not original_session then | 510 if not original_session then |
546 session.log("debug", "Tried to resume non-existent session with id %s", id); | 511 session.log("debug", "Tried to resume non-existent session with id %s", id); |
547 local old_session = old_session_registry:get(session.username, id); | 512 local old_session = old_session_registry:get(session.username, id); |
548 if old_session then | 513 if old_session then |
549 session.send(st.stanza("failed", { xmlns = xmlns_sm, h = format_h(old_session.h) }) | 514 session.send(st.stanza("failed", { xmlns = xmlns_sm, h = format_h(old_session.h) }) |