Comparison

plugins/mod_smacks.lua @ 12677:3b9771d496ed

mod_smacks: Long overdue cleanup of resumption code, fixes some old TODOs
author Matthew Wild <mwild1@gmail.com>
date Fri, 26 Aug 2022 17:04:15 +0100
parent 12550:12962a1001c2
child 12678:5a61e1603f42
comparison
equal deleted inserted replaced
12676:3ab3ef9584e3 12677:3b9771d496ed
194 local function outgoing_stanza_filter(stanza, session) 194 local function outgoing_stanza_filter(stanza, session)
195 -- XXX: Normally you wouldn't have to check the xmlns for a stanza as it's 195 -- XXX: Normally you wouldn't have to check the xmlns for a stanza as it's
196 -- supposed to be nil. 196 -- supposed to be nil.
197 -- However, when using mod_smacks with mod_websocket, then mod_websocket's 197 -- However, when using mod_smacks with mod_websocket, then mod_websocket's
198 -- stanzas/out filter can get called before this one and adds the xmlns. 198 -- stanzas/out filter can get called before this one and adds the xmlns.
199 if session.resending_unacked then return stanza end
200 if not session.smacks then return stanza end 199 if not session.smacks then return stanza end
201 local is_stanza = st.is_stanza(stanza) and 200 local is_stanza = st.is_stanza(stanza) and
202 (not stanza.attr.xmlns or stanza.attr.xmlns == 'jabber:client') 201 (not stanza.attr.xmlns or stanza.attr.xmlns == 'jabber:client')
203 and not stanza.name:find":"; 202 and not stanza.name:find":";
204 203
494 end 493 end
495 494
496 session.log("debug", "Destroying session for hibernating too long"); 495 session.log("debug", "Destroying session for hibernating too long");
497 save_old_session(session); 496 save_old_session(session);
498 session.resumption_token = nil; 497 session.resumption_token = nil;
499 session.resending_unacked = true; -- stop outgoing_stanza_filter from re-queueing anything anymore
500 sessionmanager.destroy_session(session, "Hibernating too long"); 498 sessionmanager.destroy_session(session, "Hibernating too long");
501 sessions_expired(1); 499 sessions_expired(1);
502 end); 500 end);
503 if session.conn then 501 if session.conn then
504 local conn = session.conn; 502 local conn = session.conn;
526 end 524 end
527 end 525 end
528 526
529 module:hook("s2sout-destroyed", handle_s2s_destroyed); 527 module:hook("s2sout-destroyed", handle_s2s_destroyed);
530 module:hook("s2sin-destroyed", handle_s2s_destroyed); 528 module:hook("s2sin-destroyed", handle_s2s_destroyed);
531
532 local function get_session_id(session)
533 return session.id or (tostring(session):match("[a-f0-9]+$"));
534 end
535 529
536 function handle_resume(session, stanza, xmlns_sm) 530 function handle_resume(session, stanza, xmlns_sm)
537 if session.full_jid then 531 if session.full_jid then
538 session.log("warn", "Tried to resume after resource binding"); 532 session.log("warn", "Tried to resume after resource binding");
539 session.send(st.stanza("failed", { xmlns = xmlns_sm }) 533 session.send(st.stanza("failed", { xmlns = xmlns_sm })
571 local age = 0; 565 local age = 0;
572 if original_session.hibernating then 566 if original_session.hibernating then
573 local now = os_time(); 567 local now = os_time();
574 age = now - original_session.hibernating; 568 age = now - original_session.hibernating;
575 end 569 end
576 session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session)); 570
577 original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session)); 571 session.log("debug", "mod_smacks resuming existing session %s...", original_session.id);
578 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) 572
579 if original_session.conn then 573 -- Update original_session with the parameters (connection, etc.) from the new session
580 original_session.log("debug", "mod_smacks closing an old connection for this session"); 574 sessionmanager.update_session(original_session, session);
581 local conn = original_session.conn;
582 c2s_sessions[conn] = nil;
583 conn:close();
584 end
585
586 local migrated_session_log = session.log;
587 original_session.ip = session.ip;
588 original_session.conn = session.conn;
589 original_session.rawsend = session.rawsend;
590 original_session.rawsend.session = original_session;
591 original_session.rawsend.conn = original_session.conn;
592 original_session.send = session.send;
593 original_session.send.session = original_session;
594 original_session.close = session.close;
595 original_session.filter = session.filter;
596 original_session.filter.session = original_session;
597 original_session.filters = session.filters;
598 original_session.send.filter = original_session.filter;
599 original_session.stream = session.stream;
600 original_session.secure = session.secure;
601 original_session.hibernating = nil;
602 original_session.resumption_counter = (original_session.resumption_counter or 0) + 1;
603 session.log = original_session.log;
604 session.type = original_session.type;
605 wrap_session(original_session, true);
606 -- Inform xmppstream of the new session (passed to its callbacks)
607 original_session.stream:set_session(original_session);
608 -- Similar for connlisteners
609 c2s_sessions[session.conn] = original_session;
610 575
611 local queue = original_session.outgoing_stanza_queue; 576 local queue = original_session.outgoing_stanza_queue;
612 local h = tonumber(stanza.attr.h); 577 local h = tonumber(stanza.attr.h);
613 578
614 original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked()) 579 original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked())
631 -- Ok, we need to re-send any stanzas that the client didn't see 596 -- Ok, we need to re-send any stanzas that the client didn't see
632 -- ...they are what is now left in the outgoing stanza queue 597 -- ...they are what is now left in the outgoing stanza queue
633 -- We have to use the send of "session" because we don't want to add our resent stanzas 598 -- We have to use the send of "session" because we don't want to add our resent stanzas
634 -- to the outgoing queue again 599 -- to the outgoing queue again
635 600
636 session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked()); 601 original_session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked());
637 -- FIXME Which session is it that the queue filter sees?
638 session.resending_unacked = true;
639 original_session.resending_unacked = true;
640 for _, queued_stanza in queue:resume() do 602 for _, queued_stanza in queue:resume() do
641 session.send(queued_stanza); 603 original_session.send(queued_stanza);
642 end 604 end
643 session.resending_unacked = nil; 605 session.log("debug", "all stanzas resent, enabling stream management on resumed stream, #queue = %d", queue:count_unacked());
644 original_session.resending_unacked = nil; 606
645 session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", queue:count_unacked()); 607 -- Add our own handlers to the resumed session (filters have been reset in the update)
646 function session.send(stanza) -- luacheck: ignore 432 608 wrap_session(original_session, true);
647 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); 609
648 return false; 610 -- Let everyone know that we are no longer hibernating
649 end
650 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()}); 611 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()});
651 original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption 612 original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption
652 request_ack_now_if_needed(original_session, true, "handle_resume", nil); 613 request_ack_now_if_needed(original_session, true, "handle_resume", nil);
653 resumption_age:sample(age); 614 resumption_age:sample(age);
654 end 615 end
655 return true; 616 return true;
656 end 617 end
618
657 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); 619 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
658 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); 620 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);
659 621
660 -- Events when it's sensible to request an ack 622 -- Events when it's sensible to request an ack
661 -- Could experiment with forcing (ignoring max_unacked) <r>, but when and why? 623 -- Could experiment with forcing (ignoring max_unacked) <r>, but when and why?