# HG changeset patch # User Matthew Wild # Date 1661537256 -3600 # Node ID 464a22f2751cf14db463c0dcfd5d702e4a245239 # Parent 16ea01745dbe9da123b75b830138d0c396954d0b mod_smacks: Split resumption into multiple stages, to simplify ISR integration This will allow us to return the success/failed as part of the SASL2 response, and *then* perform the stanza sync as a second step. diff -r 16ea01745dbe -r 464a22f2751c plugins/mod_smacks.lua --- a/plugins/mod_smacks.lua Fri Aug 26 19:10:15 2022 +0200 +++ b/plugins/mod_smacks.lua Fri Aug 26 19:07:36 2022 +0100 @@ -107,6 +107,12 @@ overflow = { condition = "resource-constraint", text = "Too many unacked stanzas remaining, session can't be resumed" } }); +local resume_errors = require "util.error".init("mod_smacks", xmlns_sm3, { + expired = { condition = "item-not-found", text = "Session expired, and cannot be resumed" }; + already_bound = { condition = "unexpected-request", text = "Cannot resume another session after a resource is bound" }; + unknown_session = { condition = "item-not-found", text = "Unknown session" }; +}); + -- COMPAT note the use of compatibility wrapper in events (queue:table()) local function ack_delayed(session, stanza) @@ -527,13 +533,10 @@ module:hook("s2sout-destroyed", handle_s2s_destroyed); module:hook("s2sin-destroyed", handle_s2s_destroyed); -function handle_resume(session, stanza, xmlns_sm) +function do_resume(session, stanza) if session.full_jid then session.log("warn", "Tried to resume after resource binding"); - session.send(st.stanza("failed", { xmlns = xmlns_sm }) - :tag("unexpected-request", { xmlns = xmlns_errors }) - ); - return true; + return nil, resume_errors.new("already_bound"); end local id = stanza.attr.previd; @@ -542,78 +545,94 @@ local old_session = old_session_registry:get(session.username, id); if old_session then session.log("debug", "Tried to resume old expired session with id %s", id); - session.send(st.stanza("failed", { xmlns = xmlns_sm, h = format_h(old_session.h) }) - :tag("item-not-found", { xmlns = xmlns_errors }) - ); clear_old_session(session, id); resumption_expired(1); - else - session.log("debug", "Tried to resume non-existent session with id %s", id); - session.send(st.stanza("failed", { xmlns = xmlns_sm }) - :tag("item-not-found", { xmlns = xmlns_errors }) - ); - end; - else - if original_session.hibernating_watchdog then - original_session.log("debug", "Letting the watchdog go"); - original_session.hibernating_watchdog:cancel(); - original_session.hibernating_watchdog = nil; - elseif session.hibernating then - original_session.log("error", "Hibernating session has no watchdog!") + return nil, resume_errors.new("expired", { h = old_session.h }); end - -- zero age = was not hibernating yet - local age = 0; - if original_session.hibernating then - local now = os_time(); - age = now - original_session.hibernating; - end + session.log("debug", "Tried to resume non-existent session with id %s", id); + return nil, resume_errors.new("unknown_session"); + end - session.log("debug", "mod_smacks resuming existing session %s...", original_session.id); + if original_session.hibernating_watchdog then + original_session.log("debug", "Letting the watchdog go"); + original_session.hibernating_watchdog:cancel(); + original_session.hibernating_watchdog = nil; + elseif session.hibernating then + original_session.log("error", "Hibernating session has no watchdog!") + end + -- zero age = was not hibernating yet + local age = 0; + if original_session.hibernating then + local now = os_time(); + age = now - original_session.hibernating; + end - local queue = original_session.outgoing_stanza_queue; - local h = tonumber(stanza.attr.h); + session.log("debug", "mod_smacks resuming existing session %s...", original_session.id); + + local queue = original_session.outgoing_stanza_queue; + local h = tonumber(stanza.attr.h); + + original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked()) + local acked, err = ack_errors.coerce(queue:ack(h)); -- luacheck: ignore 211/acked - original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked()) - local acked, err = ack_errors.coerce(queue:ack(h)); -- luacheck: ignore 211/acked + if not err and not queue:resumable() then + err = ack_errors.new("overflow"); + end - if not err and not queue:resumable() then - err = ack_errors.new("overflow"); - end + if err then + session.log("debug", "Resumption failed: %s", err); + return nil, err; + end + + -- Update original_session with the parameters (connection, etc.) from the new session + sessionmanager.update_session(original_session, session); - if err then - session.send(st.stanza("failed", - { xmlns = xmlns_sm; h = format_h(original_session.handled_stanza_count); previd = id })); - session.log("debug", "Resumption failed: %s", err); - return true; - end + return { + session = original_session; + id = id; + -- Return function to complete the resumption and resync unacked stanzas + -- This is two steps so we can support SASL2/ISR + finish = function () + -- Ok, we need to re-send any stanzas that the client didn't see + -- ...they are what is now left in the outgoing stanza queue + -- We have to use the send of "session" because we don't want to add our resent stanzas + -- to the outgoing queue again - -- Update original_session with the parameters (connection, etc.) from the new session - sessionmanager.update_session(original_session, session); + original_session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked()); + for _, queued_stanza in queue:resume() do + original_session.send(queued_stanza); + end + original_session.log("debug", "all stanzas resent, enabling stream management on resumed stream, #queue = %d", queue:count_unacked()); - -- Inform client of successful resumption - original_session.send(st.stanza("resumed", { xmlns = xmlns_sm, - h = format_h(original_session.handled_stanza_count), previd = id })); + -- Add our own handlers to the resumed session (filters have been reset in the update) + wrap_session(original_session, true); - -- Ok, we need to re-send any stanzas that the client didn't see - -- ...they are what is now left in the outgoing stanza queue - -- We have to use the send of "session" because we don't want to add our resent stanzas - -- to the outgoing queue again + -- Let everyone know that we are no longer hibernating + module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()}); + original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption + request_ack_now_if_needed(original_session, true, "handle_resume", nil); + resumption_age:sample(age); + end; + }; +end - original_session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked()); - for _, queued_stanza in queue:resume() do - original_session.send(queued_stanza); - end - session.log("debug", "all stanzas resent, enabling stream management on resumed stream, #queue = %d", queue:count_unacked()); +function handle_resume(session, stanza, xmlns_sm) + local resumed, err = do_resume(session, stanza); + if not resumed then + session.send(st.stanza("failed", { xmlns = xmlns_sm, h = format_h(err.context.h) }) + :tag(err.condition, { xmlns = xmlns_errors })); + return true; + end - -- Add our own handlers to the resumed session (filters have been reset in the update) - wrap_session(original_session, true); + session = resumed.session; - -- Let everyone know that we are no longer hibernating - module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()}); - original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption - request_ack_now_if_needed(original_session, true, "handle_resume", nil); - resumption_age:sample(age); - end + -- Inform client of successful resumption + session.send(st.stanza("resumed", { xmlns = xmlns_sm, + h = format_h(session.handled_stanza_count), previd = resumed.id })); + + -- Complete resume (sync stanzas, etc.) + resumed.finish(); + return true; end