Comparison

plugins/mod_smacks.lua @ 12053:03e9587fbfd2

mod_smacks: Switch storage for tracking resumption tokens All that was a complicated way to limit the number of resumable sessions. Let's control resource usage some other way. This leaves the essence of mapping resumption tokens to live sessions. This keeps resumption state across reloads.
author Kim Alvefur <zash@zash.se>
date Thu, 02 Dec 2021 14:41:19 +0100
parent 12052:d17b8fcf11c7
child 12054:0116fa57f05c
comparison
equal deleted inserted replaced
12052:d17b8fcf11c7 12053:03e9587fbfd2
16 local math_max = math.max; 16 local math_max = math.max;
17 local math_min = math.min; 17 local math_min = math.min;
18 local os_time = os.time; 18 local os_time = os.time;
19 local t_remove = table.remove; 19 local t_remove = table.remove;
20 20
21 local cache = require "util.cache";
22 local datetime = require "util.datetime"; 21 local datetime = require "util.datetime";
23 local add_filter = require "util.filters".add_filter; 22 local add_filter = require "util.filters".add_filter;
24 local jid = require "util.jid"; 23 local jid = require "util.jid";
25 local st = require "util.stanza"; 24 local st = require "util.stanza";
26 local timer = require "util.timer"; 25 local timer = require "util.timer";
42 local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", true); 41 local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", true);
43 local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false); 42 local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false);
44 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0); 43 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0);
45 local max_inactive_unacked_stanzas = module:get_option_number("smacks_max_inactive_unacked_stanzas", 256); 44 local max_inactive_unacked_stanzas = module:get_option_number("smacks_max_inactive_unacked_stanzas", 256);
46 local delayed_ack_timeout = module:get_option_number("smacks_max_ack_delay", 30); 45 local delayed_ack_timeout = module:get_option_number("smacks_max_ack_delay", 30);
47 local max_hibernated_sessions = module:get_option_number("smacks_max_hibernated_sessions", 10);
48
49 assert(max_hibernated_sessions > 0, "smacks_max_hibernated_sessions must be greater than 0");
50 46
51 local c2s_sessions = module:shared("/*/c2s/sessions"); 47 local c2s_sessions = module:shared("/*/c2s/sessions");
52 48
53 local function format_h(h) if h then return string.format("%d", h) end end 49 local function format_h(h) if h then return string.format("%d", h) end end
54 50
55 local function init_session_cache(max_entries, evict_callback)
56 -- use per user limited cache for prosody >= 0.10
57 local stores = {};
58 return {
59 get = function(user, key)
60 if not user then return nil; end
61 if not key then return nil; end
62 if not stores[user] then
63 stores[user] = cache.new(max_entries, evict_callback);
64 end
65 return stores[user]:get(key);
66 end;
67 set = function(user, key, value)
68 if not user then return nil; end
69 if not key then return nil; end
70 if not stores[user] then stores[user] = cache.new(max_entries, evict_callback); end
71 stores[user]:set(key, value);
72 -- remove empty caches completely
73 if stores[user]:count() == 0 then stores[user] = nil; end
74 end;
75 };
76 end
77 local old_session_registry = module:open_store("smacks_h", "map"); 51 local old_session_registry = module:open_store("smacks_h", "map");
78 local session_registry = init_session_cache(max_hibernated_sessions, function(resumption_token, session) 52 local session_registry = module:shared "/*/smacks/resumption-tokens"; -- > user@host/resumption-token --> resource
79 if session.destroyed then return true; end -- destroyed session can always be removed from cache
80 session.log("warn", "User has too much hibernated sessions, removing oldest session (token: %s)", resumption_token);
81 -- store old session's h values on force delete
82 -- save only actual h value and username/host (for security)
83 old_session_registry:set(session.username, resumption_token, {
84 h = session.handled_stanza_count,
85 });
86 return true; -- allow session to be removed from full cache to make room for new one
87 end);
88 53
89 local function ack_delayed(session, stanza) 54 local function ack_delayed(session, stanza)
90 -- fire event only if configured to do so and our session is not already hibernated or destroyed 55 -- fire event only if configured to do so and our session is not already hibernated or destroyed
91 if delayed_ack_timeout > 0 and session.awaiting_ack 56 if delayed_ack_timeout > 0 and session.awaiting_ack
92 and not session.hibernating and not session.destroyed then 57 and not session.hibernating and not session.destroyed then
243 end 208 end
244 209
245 module:hook("pre-session-close", function(event) 210 module:hook("pre-session-close", function(event)
246 local session = event.session; 211 local session = event.session;
247 if session.resumption_token then 212 if session.resumption_token then
248 session_registry.set(session.username, session.resumption_token, nil); 213 session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil;
249 old_session_registry:set(session.username, session.resumption_token, nil); 214 old_session_registry:set(session.username, session.resumption_token, nil);
250 session.resumption_token = nil; 215 session.resumption_token = nil;
251 end 216 end
252 -- send out last ack as per revision 1.5.2 of XEP-0198 217 -- send out last ack as per revision 1.5.2 of XEP-0198
253 if session.smacks and session.conn and session.handled_stanza_count then 218 if session.smacks and session.conn and session.handled_stanza_count then
288 253
289 local resume_token; 254 local resume_token;
290 local resume = stanza.attr.resume; 255 local resume = stanza.attr.resume;
291 if resume == "true" or resume == "1" then 256 if resume == "true" or resume == "1" then
292 resume_token = uuid_generate(); 257 resume_token = uuid_generate();
293 session_registry.set(session.username, resume_token, session); 258 session_registry[jid.join(session.username, session.host, resume_token)] = session;
294 session.resumption_token = resume_token; 259 session.resumption_token = resume_token;
295 end 260 end
296 (session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume, max = tostring(resume_timeout) })); 261 (session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume, max = tostring(resume_timeout) }));
297 return true; 262 return true;
298 end 263 end
487 session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds", 452 session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds",
488 resume_timeout - (current_time - timeout_start)); 453 resume_timeout - (current_time - timeout_start));
489 return resume_timeout-(current_time-timeout_start); -- time left to wait 454 return resume_timeout-(current_time-timeout_start); -- time left to wait
490 end 455 end
491 session.log("debug", "Destroying session for hibernating too long"); 456 session.log("debug", "Destroying session for hibernating too long");
492 session_registry.set(session.username, session.resumption_token, nil); 457 session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil;
493 -- save only actual h value and username/host (for security) 458 -- save only actual h value and username/host (for security)
494 old_session_registry:set(session.username, session.resumption_token, { 459 old_session_registry:set(session.username, session.resumption_token, {
495 h = session.handled_stanza_count, 460 h = session.handled_stanza_count,
496 }); 461 });
497 session.resumption_token = nil; 462 session.resumption_token = nil;
539 ); 504 );
540 return true; 505 return true;
541 end 506 end
542 507
543 local id = stanza.attr.previd; 508 local id = stanza.attr.previd;
544 local original_session = session_registry.get(session.username, id); 509 local original_session = session_registry[jid.join(session.username, session.host, id)];
545 if not original_session then 510 if not original_session then
546 session.log("debug", "Tried to resume non-existent session with id %s", id); 511 session.log("debug", "Tried to resume non-existent session with id %s", id);
547 local old_session = old_session_registry:get(session.username, id); 512 local old_session = old_session_registry:get(session.username, id);
548 if old_session then 513 if old_session then
549 session.send(st.stanza("failed", { xmlns = xmlns_sm, h = format_h(old_session.h) }) 514 session.send(st.stanza("failed", { xmlns = xmlns_sm, h = format_h(old_session.h) })