Software /
code /
prosody
Comparison
plugins/mod_smacks.lua @ 12677:3b9771d496ed
mod_smacks: Long overdue cleanup of resumption code, fixes some old TODOs
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Fri, 26 Aug 2022 17:04:15 +0100 |
parent | 12550:12962a1001c2 |
child | 12678:5a61e1603f42 |
comparison
equal
deleted
inserted
replaced
12676:3ab3ef9584e3 | 12677:3b9771d496ed |
---|---|
194 local function outgoing_stanza_filter(stanza, session) | 194 local function outgoing_stanza_filter(stanza, session) |
195 -- XXX: Normally you wouldn't have to check the xmlns for a stanza as it's | 195 -- XXX: Normally you wouldn't have to check the xmlns for a stanza as it's |
196 -- supposed to be nil. | 196 -- supposed to be nil. |
197 -- However, when using mod_smacks with mod_websocket, then mod_websocket's | 197 -- However, when using mod_smacks with mod_websocket, then mod_websocket's |
198 -- stanzas/out filter can get called before this one and adds the xmlns. | 198 -- stanzas/out filter can get called before this one and adds the xmlns. |
199 if session.resending_unacked then return stanza end | |
200 if not session.smacks then return stanza end | 199 if not session.smacks then return stanza end |
201 local is_stanza = st.is_stanza(stanza) and | 200 local is_stanza = st.is_stanza(stanza) and |
202 (not stanza.attr.xmlns or stanza.attr.xmlns == 'jabber:client') | 201 (not stanza.attr.xmlns or stanza.attr.xmlns == 'jabber:client') |
203 and not stanza.name:find":"; | 202 and not stanza.name:find":"; |
204 | 203 |
494 end | 493 end |
495 | 494 |
496 session.log("debug", "Destroying session for hibernating too long"); | 495 session.log("debug", "Destroying session for hibernating too long"); |
497 save_old_session(session); | 496 save_old_session(session); |
498 session.resumption_token = nil; | 497 session.resumption_token = nil; |
499 session.resending_unacked = true; -- stop outgoing_stanza_filter from re-queueing anything anymore | |
500 sessionmanager.destroy_session(session, "Hibernating too long"); | 498 sessionmanager.destroy_session(session, "Hibernating too long"); |
501 sessions_expired(1); | 499 sessions_expired(1); |
502 end); | 500 end); |
503 if session.conn then | 501 if session.conn then |
504 local conn = session.conn; | 502 local conn = session.conn; |
526 end | 524 end |
527 end | 525 end |
528 | 526 |
529 module:hook("s2sout-destroyed", handle_s2s_destroyed); | 527 module:hook("s2sout-destroyed", handle_s2s_destroyed); |
530 module:hook("s2sin-destroyed", handle_s2s_destroyed); | 528 module:hook("s2sin-destroyed", handle_s2s_destroyed); |
531 | |
532 local function get_session_id(session) | |
533 return session.id or (tostring(session):match("[a-f0-9]+$")); | |
534 end | |
535 | 529 |
536 function handle_resume(session, stanza, xmlns_sm) | 530 function handle_resume(session, stanza, xmlns_sm) |
537 if session.full_jid then | 531 if session.full_jid then |
538 session.log("warn", "Tried to resume after resource binding"); | 532 session.log("warn", "Tried to resume after resource binding"); |
539 session.send(st.stanza("failed", { xmlns = xmlns_sm }) | 533 session.send(st.stanza("failed", { xmlns = xmlns_sm }) |
571 local age = 0; | 565 local age = 0; |
572 if original_session.hibernating then | 566 if original_session.hibernating then |
573 local now = os_time(); | 567 local now = os_time(); |
574 age = now - original_session.hibernating; | 568 age = now - original_session.hibernating; |
575 end | 569 end |
576 session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session)); | 570 |
577 original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session)); | 571 session.log("debug", "mod_smacks resuming existing session %s...", original_session.id); |
578 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) | 572 |
579 if original_session.conn then | 573 -- Update original_session with the parameters (connection, etc.) from the new session |
580 original_session.log("debug", "mod_smacks closing an old connection for this session"); | 574 sessionmanager.update_session(original_session, session); |
581 local conn = original_session.conn; | |
582 c2s_sessions[conn] = nil; | |
583 conn:close(); | |
584 end | |
585 | |
586 local migrated_session_log = session.log; | |
587 original_session.ip = session.ip; | |
588 original_session.conn = session.conn; | |
589 original_session.rawsend = session.rawsend; | |
590 original_session.rawsend.session = original_session; | |
591 original_session.rawsend.conn = original_session.conn; | |
592 original_session.send = session.send; | |
593 original_session.send.session = original_session; | |
594 original_session.close = session.close; | |
595 original_session.filter = session.filter; | |
596 original_session.filter.session = original_session; | |
597 original_session.filters = session.filters; | |
598 original_session.send.filter = original_session.filter; | |
599 original_session.stream = session.stream; | |
600 original_session.secure = session.secure; | |
601 original_session.hibernating = nil; | |
602 original_session.resumption_counter = (original_session.resumption_counter or 0) + 1; | |
603 session.log = original_session.log; | |
604 session.type = original_session.type; | |
605 wrap_session(original_session, true); | |
606 -- Inform xmppstream of the new session (passed to its callbacks) | |
607 original_session.stream:set_session(original_session); | |
608 -- Similar for connlisteners | |
609 c2s_sessions[session.conn] = original_session; | |
610 | 575 |
611 local queue = original_session.outgoing_stanza_queue; | 576 local queue = original_session.outgoing_stanza_queue; |
612 local h = tonumber(stanza.attr.h); | 577 local h = tonumber(stanza.attr.h); |
613 | 578 |
614 original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked()) | 579 original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked()) |
631 -- Ok, we need to re-send any stanzas that the client didn't see | 596 -- Ok, we need to re-send any stanzas that the client didn't see |
632 -- ...they are what is now left in the outgoing stanza queue | 597 -- ...they are what is now left in the outgoing stanza queue |
633 -- We have to use the send of "session" because we don't want to add our resent stanzas | 598 -- We have to use the send of "session" because we don't want to add our resent stanzas |
634 -- to the outgoing queue again | 599 -- to the outgoing queue again |
635 | 600 |
636 session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked()); | 601 original_session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked()); |
637 -- FIXME Which session is it that the queue filter sees? | |
638 session.resending_unacked = true; | |
639 original_session.resending_unacked = true; | |
640 for _, queued_stanza in queue:resume() do | 602 for _, queued_stanza in queue:resume() do |
641 session.send(queued_stanza); | 603 original_session.send(queued_stanza); |
642 end | 604 end |
643 session.resending_unacked = nil; | 605 session.log("debug", "all stanzas resent, enabling stream management on resumed stream, #queue = %d", queue:count_unacked()); |
644 original_session.resending_unacked = nil; | 606 |
645 session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", queue:count_unacked()); | 607 -- Add our own handlers to the resumed session (filters have been reset in the update) |
646 function session.send(stanza) -- luacheck: ignore 432 | 608 wrap_session(original_session, true); |
647 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); | 609 |
648 return false; | 610 -- Let everyone know that we are no longer hibernating |
649 end | |
650 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()}); | 611 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()}); |
651 original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption | 612 original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption |
652 request_ack_now_if_needed(original_session, true, "handle_resume", nil); | 613 request_ack_now_if_needed(original_session, true, "handle_resume", nil); |
653 resumption_age:sample(age); | 614 resumption_age:sample(age); |
654 end | 615 end |
655 return true; | 616 return true; |
656 end | 617 end |
618 | |
657 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); | 619 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); |
658 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); | 620 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); |
659 | 621 |
660 -- Events when it's sensible to request an ack | 622 -- Events when it's sensible to request an ack |
661 -- Could experiment with forcing (ignoring max_unacked) <r>, but when and why? | 623 -- Could experiment with forcing (ignoring max_unacked) <r>, but when and why? |