Changeset

12677:3b9771d496ed

mod_smacks: Long overdue cleanup of resumption code, fixes some old TODOs
author Matthew Wild <mwild1@gmail.com>
date Fri, 26 Aug 2022 17:04:15 +0100
parents 12676:3ab3ef9584e3
children 12678:5a61e1603f42
files core/sessionmanager.lua plugins/mod_c2s.lua plugins/mod_smacks.lua
diffstat 3 files changed, 66 insertions(+), 52 deletions(-) [+]
line wrap: on
line diff
--- a/core/sessionmanager.lua	Thu Aug 25 22:42:41 2022 +0200
+++ b/core/sessionmanager.lua	Fri Aug 26 17:04:15 2022 +0100
@@ -10,7 +10,7 @@
 local tostring, setmetatable = tostring, setmetatable;
 local pairs, next= pairs, next;
 
-local hosts = prosody.hosts;
+local prosody, hosts = prosody, prosody.hosts;
 local full_sessions = prosody.full_sessions;
 local bare_sessions = prosody.bare_sessions;
 
@@ -92,6 +92,49 @@
 	return setmetatable(session, resting_session);
 end
 
+-- Update a session with a new one (transplanting connection, filters, etc.)
+-- new_session should be discarded after this call returns
+local function update_session(to_session, from_session)
+	to_session.log("debug", "Updating with parameters from session %s", from_session.id);
+	from_session.log("debug", "Session absorbed into %s", to_session.id);
+
+	local replaced_conn = to_session.conn;
+	if replaced_conn then
+		to_session.log("debug", "closing a replaced connection for this session");
+		replaced_conn:close();
+	end
+
+	to_session.ip = from_session.ip;
+	to_session.conn = from_session.conn;
+	to_session.rawsend = from_session.rawsend;
+	to_session.rawsend.session = to_session;
+	to_session.rawsend.conn = to_session.conn;
+	to_session.send = from_session.send;
+	to_session.send.session = to_session;
+	to_session.close = from_session.close;
+	to_session.filter = from_session.filter;
+	to_session.filter.session = to_session;
+	to_session.filters = from_session.filters;
+	to_session.send.filter = to_session.filter;
+	to_session.stream = from_session.stream;
+	to_session.secure = from_session.secure;
+	to_session.hibernating = nil;
+	to_session.resumption_counter = (to_session.resumption_counter or 0) + 1;
+	from_session.log = to_session.log;
+	from_session.type = to_session.type;
+	-- Inform xmppstream of the new session (passed to its callbacks)
+	to_session.stream:set_session(to_session);
+
+	-- Retire the session we've pulled from, to avoid two sessions on the same connection
+	retire_session(from_session);
+
+	prosody.events.fire_event("c2s-session-updated", {
+		session = to_session;
+		from_session = from_session;
+		replaced_conn = replaced_conn;
+	});
+end
+
 local function destroy_session(session, err)
 	(session.log or log)("debug", "Destroying session for %s (%s@%s)%s",
 		session.full_jid or "(unknown)", session.username or "(unknown)",
@@ -267,6 +310,7 @@
 return {
 	new_session = new_session;
 	retire_session = retire_session;
+	update_session = update_session;
 	destroy_session = destroy_session;
 	make_authenticated = make_authenticated;
 	bind_resource = bind_resource;
--- a/plugins/mod_c2s.lua	Thu Aug 25 22:42:41 2022 +0200
+++ b/plugins/mod_c2s.lua	Fri Aug 26 17:04:15 2022 +0100
@@ -262,6 +262,14 @@
 module:hook_global("user-role-changed", disconnect_user_sessions({ condition = "reset", text = "Role changed" }), 200);
 module:hook_global("user-deleted", disconnect_user_sessions({ condition = "not-authorized", text = "Account deleted" }), 200);
 
+module:hook_global("c2s-session-updated", function (event)
+	sessions[event.session.conn] = event.session;
+	local replaced_conn = event.replaced_conn;
+	if replaced_conn then
+		sessions[replaced_conn] = nil;
+	end
+end);
+
 function runner_callbacks:ready()
 	if self.data.conn then
 		self.data.conn:resume();
--- a/plugins/mod_smacks.lua	Thu Aug 25 22:42:41 2022 +0200
+++ b/plugins/mod_smacks.lua	Fri Aug 26 17:04:15 2022 +0100
@@ -196,7 +196,6 @@
 	-- supposed to be nil.
 	-- However, when using mod_smacks with mod_websocket, then mod_websocket's
 	-- stanzas/out filter can get called before this one and adds the xmlns.
-	if session.resending_unacked then return stanza end
 	if not session.smacks then return stanza end
 	local is_stanza = st.is_stanza(stanza) and
 		(not stanza.attr.xmlns or stanza.attr.xmlns == 'jabber:client')
@@ -496,7 +495,6 @@
 		session.log("debug", "Destroying session for hibernating too long");
 		save_old_session(session);
 		session.resumption_token = nil;
-		session.resending_unacked = true; -- stop outgoing_stanza_filter from re-queueing anything anymore
 		sessionmanager.destroy_session(session, "Hibernating too long");
 		sessions_expired(1);
 	end);
@@ -529,10 +527,6 @@
 module:hook("s2sout-destroyed", handle_s2s_destroyed);
 module:hook("s2sin-destroyed", handle_s2s_destroyed);
 
-local function get_session_id(session)
-	return session.id or (tostring(session):match("[a-f0-9]+$"));
-end
-
 function handle_resume(session, stanza, xmlns_sm)
 	if session.full_jid then
 		session.log("warn", "Tried to resume after resource binding");
@@ -573,40 +567,11 @@
 			local now = os_time();
 			age = now - original_session.hibernating;
 		end
-		session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session));
-		original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session));
-		-- TODO: All this should move to sessionmanager (e.g. session:replace(new_session))
-		if original_session.conn then
-			original_session.log("debug", "mod_smacks closing an old connection for this session");
-			local conn = original_session.conn;
-			c2s_sessions[conn] = nil;
-			conn:close();
-		end
+
+		session.log("debug", "mod_smacks resuming existing session %s...", original_session.id);
 
-		local migrated_session_log = session.log;
-		original_session.ip = session.ip;
-		original_session.conn = session.conn;
-		original_session.rawsend = session.rawsend;
-		original_session.rawsend.session = original_session;
-		original_session.rawsend.conn = original_session.conn;
-		original_session.send = session.send;
-		original_session.send.session = original_session;
-		original_session.close = session.close;
-		original_session.filter = session.filter;
-		original_session.filter.session = original_session;
-		original_session.filters = session.filters;
-		original_session.send.filter = original_session.filter;
-		original_session.stream = session.stream;
-		original_session.secure = session.secure;
-		original_session.hibernating = nil;
-		original_session.resumption_counter = (original_session.resumption_counter or 0) + 1;
-		session.log = original_session.log;
-		session.type = original_session.type;
-		wrap_session(original_session, true);
-		-- Inform xmppstream of the new session (passed to its callbacks)
-		original_session.stream:set_session(original_session);
-		-- Similar for connlisteners
-		c2s_sessions[session.conn] = original_session;
+		-- Update original_session with the parameters (connection, etc.) from the new session
+		sessionmanager.update_session(original_session, session);
 
 		local queue = original_session.outgoing_stanza_queue;
 		local h = tonumber(stanza.attr.h);
@@ -633,20 +598,16 @@
 		-- We have to use the send of "session" because we don't want to add our resent stanzas
 		-- to the outgoing queue again
 
-		session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked());
-		-- FIXME Which session is it that the queue filter sees?
-		session.resending_unacked = true;
-		original_session.resending_unacked = true;
+		original_session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked());
 		for _, queued_stanza in queue:resume() do
-			session.send(queued_stanza);
+			original_session.send(queued_stanza);
 		end
-		session.resending_unacked = nil;
-		original_session.resending_unacked = nil;
-		session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", queue:count_unacked());
-		function session.send(stanza) -- luacheck: ignore 432
-			migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza));
-			return false;
-		end
+		session.log("debug", "all stanzas resent, enabling stream management on resumed stream, #queue = %d", queue:count_unacked());
+
+		-- Add our own handlers to the resumed session (filters have been reset in the update)
+		wrap_session(original_session, true);
+
+		-- Let everyone know that we are no longer hibernating
 		module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()});
 		original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption
 		request_ack_now_if_needed(original_session, true, "handle_resume", nil);
@@ -654,6 +615,7 @@
 	end
 	return true;
 end
+
 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);