Software /
code /
prosody
Comparison
plugins/mod_smacks.lua @ 12054:0116fa57f05c
mod_smacks: Set a watchdog to watch sleeping sessions
Extending the timeout by poking the watchdog, and letting it go on
resumption, should be much better than the previous method.
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Thu, 02 Dec 2021 02:46:26 +0100 |
parent | 12053:03e9587fbfd2 |
child | 12056:e62025f949f9 |
comparison
equal
deleted
inserted
replaced
12053:03e9587fbfd2 | 12054:0116fa57f05c |
---|---|
11 -- COPYING file in the source package for more information. | 11 -- COPYING file in the source package for more information. |
12 -- | 12 -- |
13 | 13 |
14 local tonumber = tonumber; | 14 local tonumber = tonumber; |
15 local tostring = tostring; | 15 local tostring = tostring; |
16 local math_max = math.max; | |
17 local math_min = math.min; | 16 local math_min = math.min; |
18 local os_time = os.time; | 17 local os_time = os.time; |
19 local t_remove = table.remove; | 18 local t_remove = table.remove; |
20 | 19 |
21 local datetime = require "util.datetime"; | 20 local datetime = require "util.datetime"; |
22 local add_filter = require "util.filters".add_filter; | 21 local add_filter = require "util.filters".add_filter; |
23 local jid = require "util.jid"; | 22 local jid = require "util.jid"; |
24 local st = require "util.stanza"; | 23 local st = require "util.stanza"; |
25 local timer = require "util.timer"; | 24 local timer = require "util.timer"; |
26 local uuid_generate = require "util.uuid".generate; | 25 local uuid_generate = require "util.uuid".generate; |
26 local watchdog = require "util.watchdog"; | |
27 | 27 |
28 local sessionmanager = require "core.sessionmanager"; | 28 local sessionmanager = require "core.sessionmanager"; |
29 local core_process_stanza = prosody.core_process_stanza; | 29 local core_process_stanza = prosody.core_process_stanza; |
30 | 30 |
31 local xmlns_errors = "urn:ietf:params:xml:ns:xmpp-stanzas"; | 31 local xmlns_errors = "urn:ietf:params:xml:ns:xmpp-stanzas"; |
411 end | 411 end |
412 end); | 412 end); |
413 | 413 |
414 module:hook("pre-resource-unbind", function (event) | 414 module:hook("pre-resource-unbind", function (event) |
415 local session = event.session; | 415 local session = event.session; |
416 if session.smacks then | 416 if not session.smacks then return end |
417 if not session.resumption_token then | 417 if not session.resumption_token then |
418 local queue = session.outgoing_stanza_queue; | 418 local queue = session.outgoing_stanza_queue; |
419 if #queue > 0 then | 419 if #queue > 0 then |
420 session.log("debug", "Destroying session with %d unacked stanzas", #queue); | 420 session.log("debug", "Destroying session with %d unacked stanzas", #queue); |
421 handle_unacked_stanzas(session); | 421 handle_unacked_stanzas(session); |
422 end | 422 end |
423 else | 423 return |
424 session.log("debug", "mod_smacks hibernating session for up to %d seconds", resume_timeout); | 424 end |
425 local hibernate_time = os_time(); -- Track the time we went into hibernation | 425 |
426 session.hibernating = hibernate_time; | 426 session.hibernating = os_time(); |
427 local resumption_token = session.resumption_token; | 427 session.hibernating_watchdog = watchdog.new(resume_timeout, function() |
428 module:fire_event("smacks-hibernation-start", {origin = session, queue = session.outgoing_stanza_queue}); | 428 session.log("debug", "mod_smacks hibernation timeout reached..."); |
429 timer.add_task(resume_timeout, function () | 429 if session.destroyed then |
430 session.log("debug", "mod_smacks hibernation timeout reached..."); | 430 session.log("debug", "The session has already been destroyed"); |
431 -- We need to check the current resumption token for this resource | 431 return |
432 -- matches the smacks session this timer is for in case it changed | 432 end |
433 -- (for example, the client may have bound a new resource and | 433 |
434 -- started a new smacks session, or not be using smacks) | 434 session.log("debug", "Destroying session for hibernating too long"); |
435 local curr_session = prosody.full_sessions[session.full_jid]; | 435 session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil; |
436 if session.destroyed then | 436 old_session_registry:set(session.username, session.resumption_token, { h = session.handled_stanza_count }); |
437 session.log("debug", "The session has already been destroyed"); | 437 session.resumption_token = nil; |
438 elseif curr_session and curr_session.resumption_token == resumption_token | 438 sessionmanager.destroy_session(session, "Hibernating too long"); |
439 -- Check the hibernate time still matches what we think it is, | 439 end); |
440 -- otherwise the session resumed and re-hibernated. | 440 if session.conn then |
441 and session.hibernating == hibernate_time then | 441 session.conn:close(); |
442 -- wait longer if the timeout isn't reached because push was enabled for this session | 442 end |
443 -- session.first_hibernated_push is the starting point for hibernation timeouts of those push enabled clients | 443 module:fire_event("smacks-hibernation-start", { origin = session; queue = session.outgoing_stanza_queue }); |
444 -- wait for an additional resume_timeout seconds if no push occurred since hibernation at all | 444 return true; -- Postpone destruction for now |
445 local current_time = os_time(); | |
446 local timeout_start = math_max(session.hibernating, session.first_hibernated_push or session.hibernating); | |
447 if session.push_identifier ~= nil and not session.first_hibernated_push then | |
448 session.log("debug", "No push happened since hibernation started, hibernating session for up to %d extra seconds", resume_timeout); | |
449 return resume_timeout; | |
450 end | |
451 if session.push_identifier ~= nil and current_time-timeout_start < resume_timeout then | |
452 session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds", | |
453 resume_timeout - (current_time - timeout_start)); | |
454 return resume_timeout-(current_time-timeout_start); -- time left to wait | |
455 end | |
456 session.log("debug", "Destroying session for hibernating too long"); | |
457 session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil; | |
458 -- save only actual h value and username/host (for security) | |
459 old_session_registry:set(session.username, session.resumption_token, { | |
460 h = session.handled_stanza_count, | |
461 }); | |
462 session.resumption_token = nil; | |
463 sessionmanager.destroy_session(session, "Hibernating too long"); | |
464 else | |
465 session.log("debug", "Session resumed before hibernation timeout, all is well") | |
466 end | |
467 end); | |
468 if session.conn then | |
469 session.conn:close(); | |
470 end | |
471 return true; -- Postpone destruction for now | |
472 end | |
473 end | |
474 end); | 445 end); |
475 | 446 |
476 local function handle_s2s_destroyed(event) | 447 local function handle_s2s_destroyed(event) |
477 local session = event.session; | 448 local session = event.session; |
478 local queue = session.outgoing_stanza_queue; | 449 local queue = session.outgoing_stanza_queue; |
518 else | 489 else |
519 session.send(st.stanza("failed", { xmlns = xmlns_sm }) | 490 session.send(st.stanza("failed", { xmlns = xmlns_sm }) |
520 :tag("item-not-found", { xmlns = xmlns_errors }) | 491 :tag("item-not-found", { xmlns = xmlns_errors }) |
521 ); | 492 ); |
522 end; | 493 end; |
523 elseif session.username == original_session.username | 494 else |
524 and session.host == original_session.host then | 495 if original_session.hibernating_watchdog then |
496 original_session.hibernating_watchdog:cancel(); | |
497 original_session.hibernating_watchdog = nil; | |
498 end | |
525 session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session)); | 499 session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session)); |
526 original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session)); | 500 original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session)); |
527 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) | 501 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) |
528 if original_session.conn then | 502 if original_session.conn then |
529 original_session.log("debug", "mod_smacks closing an old connection for this session"); | 503 original_session.log("debug", "mod_smacks closing an old connection for this session"); |
582 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); | 556 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); |
583 return false; | 557 return false; |
584 end | 558 end |
585 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue}); | 559 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue}); |
586 request_ack_if_needed(original_session, true, "handle_resume", nil); | 560 request_ack_if_needed(original_session, true, "handle_resume", nil); |
587 else | |
588 module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]", | |
589 session.username or "?", session.host or "?", session.type, | |
590 original_session.username or "?", original_session.host or "?", original_session.type); | |
591 session.send(st.stanza("failed", { xmlns = xmlns_sm }) | |
592 :tag("not-authorized", { xmlns = xmlns_errors })); | |
593 end | 561 end |
594 return true; | 562 return true; |
595 end | 563 end |
596 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); | 564 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); |
597 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); | 565 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); |