Software / code / prosody
Comparison
plugins/mod_smacks.lua @ 12677:3b9771d496ed
mod_smacks: Long overdue cleanup of resumption code, fixes some old TODOs
| author | Matthew Wild <mwild1@gmail.com> |
|---|---|
| date | Fri, 26 Aug 2022 17:04:15 +0100 |
| parent | 12550:12962a1001c2 |
| child | 12678:5a61e1603f42 |
comparison
equal
deleted
inserted
replaced
| 12676:3ab3ef9584e3 | 12677:3b9771d496ed |
|---|---|
| 194 local function outgoing_stanza_filter(stanza, session) | 194 local function outgoing_stanza_filter(stanza, session) |
| 195 -- XXX: Normally you wouldn't have to check the xmlns for a stanza as it's | 195 -- XXX: Normally you wouldn't have to check the xmlns for a stanza as it's |
| 196 -- supposed to be nil. | 196 -- supposed to be nil. |
| 197 -- However, when using mod_smacks with mod_websocket, then mod_websocket's | 197 -- However, when using mod_smacks with mod_websocket, then mod_websocket's |
| 198 -- stanzas/out filter can get called before this one and adds the xmlns. | 198 -- stanzas/out filter can get called before this one and adds the xmlns. |
| 199 if session.resending_unacked then return stanza end | |
| 200 if not session.smacks then return stanza end | 199 if not session.smacks then return stanza end |
| 201 local is_stanza = st.is_stanza(stanza) and | 200 local is_stanza = st.is_stanza(stanza) and |
| 202 (not stanza.attr.xmlns or stanza.attr.xmlns == 'jabber:client') | 201 (not stanza.attr.xmlns or stanza.attr.xmlns == 'jabber:client') |
| 203 and not stanza.name:find":"; | 202 and not stanza.name:find":"; |
| 204 | 203 |
| 494 end | 493 end |
| 495 | 494 |
| 496 session.log("debug", "Destroying session for hibernating too long"); | 495 session.log("debug", "Destroying session for hibernating too long"); |
| 497 save_old_session(session); | 496 save_old_session(session); |
| 498 session.resumption_token = nil; | 497 session.resumption_token = nil; |
| 499 session.resending_unacked = true; -- stop outgoing_stanza_filter from re-queueing anything anymore | |
| 500 sessionmanager.destroy_session(session, "Hibernating too long"); | 498 sessionmanager.destroy_session(session, "Hibernating too long"); |
| 501 sessions_expired(1); | 499 sessions_expired(1); |
| 502 end); | 500 end); |
| 503 if session.conn then | 501 if session.conn then |
| 504 local conn = session.conn; | 502 local conn = session.conn; |
| 526 end | 524 end |
| 527 end | 525 end |
| 528 | 526 |
| 529 module:hook("s2sout-destroyed", handle_s2s_destroyed); | 527 module:hook("s2sout-destroyed", handle_s2s_destroyed); |
| 530 module:hook("s2sin-destroyed", handle_s2s_destroyed); | 528 module:hook("s2sin-destroyed", handle_s2s_destroyed); |
| 531 | |
| 532 local function get_session_id(session) | |
| 533 return session.id or (tostring(session):match("[a-f0-9]+$")); | |
| 534 end | |
| 535 | 529 |
| 536 function handle_resume(session, stanza, xmlns_sm) | 530 function handle_resume(session, stanza, xmlns_sm) |
| 537 if session.full_jid then | 531 if session.full_jid then |
| 538 session.log("warn", "Tried to resume after resource binding"); | 532 session.log("warn", "Tried to resume after resource binding"); |
| 539 session.send(st.stanza("failed", { xmlns = xmlns_sm }) | 533 session.send(st.stanza("failed", { xmlns = xmlns_sm }) |
| 571 local age = 0; | 565 local age = 0; |
| 572 if original_session.hibernating then | 566 if original_session.hibernating then |
| 573 local now = os_time(); | 567 local now = os_time(); |
| 574 age = now - original_session.hibernating; | 568 age = now - original_session.hibernating; |
| 575 end | 569 end |
| 576 session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session)); | 570 |
| 577 original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session)); | 571 session.log("debug", "mod_smacks resuming existing session %s...", original_session.id); |
| 578 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) | 572 |
| 579 if original_session.conn then | 573 -- Update original_session with the parameters (connection, etc.) from the new session |
| 580 original_session.log("debug", "mod_smacks closing an old connection for this session"); | 574 sessionmanager.update_session(original_session, session); |
| 581 local conn = original_session.conn; | |
| 582 c2s_sessions[conn] = nil; | |
| 583 conn:close(); | |
| 584 end | |
| 585 | |
| 586 local migrated_session_log = session.log; | |
| 587 original_session.ip = session.ip; | |
| 588 original_session.conn = session.conn; | |
| 589 original_session.rawsend = session.rawsend; | |
| 590 original_session.rawsend.session = original_session; | |
| 591 original_session.rawsend.conn = original_session.conn; | |
| 592 original_session.send = session.send; | |
| 593 original_session.send.session = original_session; | |
| 594 original_session.close = session.close; | |
| 595 original_session.filter = session.filter; | |
| 596 original_session.filter.session = original_session; | |
| 597 original_session.filters = session.filters; | |
| 598 original_session.send.filter = original_session.filter; | |
| 599 original_session.stream = session.stream; | |
| 600 original_session.secure = session.secure; | |
| 601 original_session.hibernating = nil; | |
| 602 original_session.resumption_counter = (original_session.resumption_counter or 0) + 1; | |
| 603 session.log = original_session.log; | |
| 604 session.type = original_session.type; | |
| 605 wrap_session(original_session, true); | |
| 606 -- Inform xmppstream of the new session (passed to its callbacks) | |
| 607 original_session.stream:set_session(original_session); | |
| 608 -- Similar for connlisteners | |
| 609 c2s_sessions[session.conn] = original_session; | |
| 610 | 575 |
| 611 local queue = original_session.outgoing_stanza_queue; | 576 local queue = original_session.outgoing_stanza_queue; |
| 612 local h = tonumber(stanza.attr.h); | 577 local h = tonumber(stanza.attr.h); |
| 613 | 578 |
| 614 original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked()) | 579 original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked()) |
| 631 -- Ok, we need to re-send any stanzas that the client didn't see | 596 -- Ok, we need to re-send any stanzas that the client didn't see |
| 632 -- ...they are what is now left in the outgoing stanza queue | 597 -- ...they are what is now left in the outgoing stanza queue |
| 633 -- We have to use the send of "session" because we don't want to add our resent stanzas | 598 -- We have to use the send of "session" because we don't want to add our resent stanzas |
| 634 -- to the outgoing queue again | 599 -- to the outgoing queue again |
| 635 | 600 |
| 636 session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked()); | 601 original_session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked()); |
| 637 -- FIXME Which session is it that the queue filter sees? | |
| 638 session.resending_unacked = true; | |
| 639 original_session.resending_unacked = true; | |
| 640 for _, queued_stanza in queue:resume() do | 602 for _, queued_stanza in queue:resume() do |
| 641 session.send(queued_stanza); | 603 original_session.send(queued_stanza); |
| 642 end | 604 end |
| 643 session.resending_unacked = nil; | 605 session.log("debug", "all stanzas resent, enabling stream management on resumed stream, #queue = %d", queue:count_unacked()); |
| 644 original_session.resending_unacked = nil; | 606 |
| 645 session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", queue:count_unacked()); | 607 -- Add our own handlers to the resumed session (filters have been reset in the update) |
| 646 function session.send(stanza) -- luacheck: ignore 432 | 608 wrap_session(original_session, true); |
| 647 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); | 609 |
| 648 return false; | 610 -- Let everyone know that we are no longer hibernating |
| 649 end | |
| 650 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()}); | 611 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()}); |
| 651 original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption | 612 original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption |
| 652 request_ack_now_if_needed(original_session, true, "handle_resume", nil); | 613 request_ack_now_if_needed(original_session, true, "handle_resume", nil); |
| 653 resumption_age:sample(age); | 614 resumption_age:sample(age); |
| 654 end | 615 end |
| 655 return true; | 616 return true; |
| 656 end | 617 end |
| 618 | |
| 657 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); | 619 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); |
| 658 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); | 620 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); |
| 659 | 621 |
| 660 -- Events when it's sensible to request an ack | 622 -- Events when it's sensible to request an ack |
| 661 -- Could experiment with forcing (ignoring max_unacked) <r>, but when and why? | 623 -- Could experiment with forcing (ignoring max_unacked) <r>, but when and why? |