Comparison

plugins/mod_smacks.lua @ 12054:0116fa57f05c

mod_smacks: Set a watchdog to watch sleeping sessions Extending the timeout by poking the watchdog, and letting it go on resumption, should be much better than the previous method.
author Kim Alvefur <zash@zash.se>
date Thu, 02 Dec 2021 02:46:26 +0100
parent 12053:03e9587fbfd2
child 12056:e62025f949f9
comparison
equal deleted inserted replaced
12053:03e9587fbfd2 12054:0116fa57f05c
11 -- COPYING file in the source package for more information. 11 -- COPYING file in the source package for more information.
12 -- 12 --
13 13
14 local tonumber = tonumber; 14 local tonumber = tonumber;
15 local tostring = tostring; 15 local tostring = tostring;
16 local math_max = math.max;
17 local math_min = math.min; 16 local math_min = math.min;
18 local os_time = os.time; 17 local os_time = os.time;
19 local t_remove = table.remove; 18 local t_remove = table.remove;
20 19
21 local datetime = require "util.datetime"; 20 local datetime = require "util.datetime";
22 local add_filter = require "util.filters".add_filter; 21 local add_filter = require "util.filters".add_filter;
23 local jid = require "util.jid"; 22 local jid = require "util.jid";
24 local st = require "util.stanza"; 23 local st = require "util.stanza";
25 local timer = require "util.timer"; 24 local timer = require "util.timer";
26 local uuid_generate = require "util.uuid".generate; 25 local uuid_generate = require "util.uuid".generate;
26 local watchdog = require "util.watchdog";
27 27
28 local sessionmanager = require "core.sessionmanager"; 28 local sessionmanager = require "core.sessionmanager";
29 local core_process_stanza = prosody.core_process_stanza; 29 local core_process_stanza = prosody.core_process_stanza;
30 30
31 local xmlns_errors = "urn:ietf:params:xml:ns:xmpp-stanzas"; 31 local xmlns_errors = "urn:ietf:params:xml:ns:xmpp-stanzas";
411 end 411 end
412 end); 412 end);
413 413
414 module:hook("pre-resource-unbind", function (event) 414 module:hook("pre-resource-unbind", function (event)
415 local session = event.session; 415 local session = event.session;
416 if session.smacks then 416 if not session.smacks then return end
417 if not session.resumption_token then 417 if not session.resumption_token then
418 local queue = session.outgoing_stanza_queue; 418 local queue = session.outgoing_stanza_queue;
419 if #queue > 0 then 419 if #queue > 0 then
420 session.log("debug", "Destroying session with %d unacked stanzas", #queue); 420 session.log("debug", "Destroying session with %d unacked stanzas", #queue);
421 handle_unacked_stanzas(session); 421 handle_unacked_stanzas(session);
422 end 422 end
423 else 423 return
424 session.log("debug", "mod_smacks hibernating session for up to %d seconds", resume_timeout); 424 end
425 local hibernate_time = os_time(); -- Track the time we went into hibernation 425
426 session.hibernating = hibernate_time; 426 session.hibernating = os_time();
427 local resumption_token = session.resumption_token; 427 session.hibernating_watchdog = watchdog.new(resume_timeout, function()
428 module:fire_event("smacks-hibernation-start", {origin = session, queue = session.outgoing_stanza_queue}); 428 session.log("debug", "mod_smacks hibernation timeout reached...");
429 timer.add_task(resume_timeout, function () 429 if session.destroyed then
430 session.log("debug", "mod_smacks hibernation timeout reached..."); 430 session.log("debug", "The session has already been destroyed");
431 -- We need to check the current resumption token for this resource 431 return
432 -- matches the smacks session this timer is for in case it changed 432 end
433 -- (for example, the client may have bound a new resource and 433
434 -- started a new smacks session, or not be using smacks) 434 session.log("debug", "Destroying session for hibernating too long");
435 local curr_session = prosody.full_sessions[session.full_jid]; 435 session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil;
436 if session.destroyed then 436 old_session_registry:set(session.username, session.resumption_token, { h = session.handled_stanza_count });
437 session.log("debug", "The session has already been destroyed"); 437 session.resumption_token = nil;
438 elseif curr_session and curr_session.resumption_token == resumption_token 438 sessionmanager.destroy_session(session, "Hibernating too long");
439 -- Check the hibernate time still matches what we think it is, 439 end);
440 -- otherwise the session resumed and re-hibernated. 440 if session.conn then
441 and session.hibernating == hibernate_time then 441 session.conn:close();
442 -- wait longer if the timeout isn't reached because push was enabled for this session 442 end
443 -- session.first_hibernated_push is the starting point for hibernation timeouts of those push enabled clients 443 module:fire_event("smacks-hibernation-start", { origin = session; queue = session.outgoing_stanza_queue });
444 -- wait for an additional resume_timeout seconds if no push occurred since hibernation at all 444 return true; -- Postpone destruction for now
445 local current_time = os_time();
446 local timeout_start = math_max(session.hibernating, session.first_hibernated_push or session.hibernating);
447 if session.push_identifier ~= nil and not session.first_hibernated_push then
448 session.log("debug", "No push happened since hibernation started, hibernating session for up to %d extra seconds", resume_timeout);
449 return resume_timeout;
450 end
451 if session.push_identifier ~= nil and current_time-timeout_start < resume_timeout then
452 session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds",
453 resume_timeout - (current_time - timeout_start));
454 return resume_timeout-(current_time-timeout_start); -- time left to wait
455 end
456 session.log("debug", "Destroying session for hibernating too long");
457 session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil;
458 -- save only actual h value and username/host (for security)
459 old_session_registry:set(session.username, session.resumption_token, {
460 h = session.handled_stanza_count,
461 });
462 session.resumption_token = nil;
463 sessionmanager.destroy_session(session, "Hibernating too long");
464 else
465 session.log("debug", "Session resumed before hibernation timeout, all is well")
466 end
467 end);
468 if session.conn then
469 session.conn:close();
470 end
471 return true; -- Postpone destruction for now
472 end
473 end
474 end); 445 end);
475 446
476 local function handle_s2s_destroyed(event) 447 local function handle_s2s_destroyed(event)
477 local session = event.session; 448 local session = event.session;
478 local queue = session.outgoing_stanza_queue; 449 local queue = session.outgoing_stanza_queue;
518 else 489 else
519 session.send(st.stanza("failed", { xmlns = xmlns_sm }) 490 session.send(st.stanza("failed", { xmlns = xmlns_sm })
520 :tag("item-not-found", { xmlns = xmlns_errors }) 491 :tag("item-not-found", { xmlns = xmlns_errors })
521 ); 492 );
522 end; 493 end;
523 elseif session.username == original_session.username 494 else
524 and session.host == original_session.host then 495 if original_session.hibernating_watchdog then
496 original_session.hibernating_watchdog:cancel();
497 original_session.hibernating_watchdog = nil;
498 end
525 session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session)); 499 session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session));
526 original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session)); 500 original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session));
527 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) 501 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session))
528 if original_session.conn then 502 if original_session.conn then
529 original_session.log("debug", "mod_smacks closing an old connection for this session"); 503 original_session.log("debug", "mod_smacks closing an old connection for this session");
582 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); 556 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza));
583 return false; 557 return false;
584 end 558 end
585 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue}); 559 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue});
586 request_ack_if_needed(original_session, true, "handle_resume", nil); 560 request_ack_if_needed(original_session, true, "handle_resume", nil);
587 else
588 module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]",
589 session.username or "?", session.host or "?", session.type,
590 original_session.username or "?", original_session.host or "?", original_session.type);
591 session.send(st.stanza("failed", { xmlns = xmlns_sm })
592 :tag("not-authorized", { xmlns = xmlns_errors }));
593 end 561 end
594 return true; 562 return true;
595 end 563 end
596 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); 564 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
597 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); 565 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);