Diff

plugins/mod_smacks.lua @ 12054:0116fa57f05c

mod_smacks: Set a watchdog to watch sleeping sessions Extending the timeout by poking the watchdog, and letting it go on resumption, should be much better than the previous method.
author Kim Alvefur <zash@zash.se>
date Thu, 02 Dec 2021 02:46:26 +0100
parent 12053:03e9587fbfd2
child 12056:e62025f949f9
line wrap: on
line diff
--- a/plugins/mod_smacks.lua	Thu Dec 02 14:41:19 2021 +0100
+++ b/plugins/mod_smacks.lua	Thu Dec 02 02:46:26 2021 +0100
@@ -13,7 +13,6 @@
 
 local tonumber = tonumber;
 local tostring = tostring;
-local math_max = math.max;
 local math_min = math.min;
 local os_time = os.time;
 local t_remove = table.remove;
@@ -24,6 +23,7 @@
 local st = require "util.stanza";
 local timer = require "util.timer";
 local uuid_generate = require "util.uuid".generate;
+local watchdog = require "util.watchdog";
 
 local sessionmanager = require "core.sessionmanager";
 local core_process_stanza = prosody.core_process_stanza;
@@ -413,64 +413,35 @@
 
 module:hook("pre-resource-unbind", function (event)
 	local session = event.session;
-	if session.smacks then
+	if not session.smacks then return end
 		if not session.resumption_token then
 			local queue = session.outgoing_stanza_queue;
 			if #queue > 0 then
 				session.log("debug", "Destroying session with %d unacked stanzas", #queue);
 				handle_unacked_stanzas(session);
 			end
-		else
-			session.log("debug", "mod_smacks hibernating session for up to %d seconds", resume_timeout);
-			local hibernate_time = os_time(); -- Track the time we went into hibernation
-			session.hibernating = hibernate_time;
-			local resumption_token = session.resumption_token;
-			module:fire_event("smacks-hibernation-start", {origin = session, queue = session.outgoing_stanza_queue});
-			timer.add_task(resume_timeout, function ()
-				session.log("debug", "mod_smacks hibernation timeout reached...");
-				-- We need to check the current resumption token for this resource
-				-- matches the smacks session this timer is for in case it changed
-				-- (for example, the client may have bound a new resource and
-				-- started a new smacks session, or not be using smacks)
-				local curr_session = prosody.full_sessions[session.full_jid];
-				if session.destroyed then
-					session.log("debug", "The session has already been destroyed");
-				elseif curr_session and curr_session.resumption_token == resumption_token
-				-- Check the hibernate time still matches what we think it is,
-				-- otherwise the session resumed and re-hibernated.
-				and session.hibernating == hibernate_time then
-					-- wait longer if the timeout isn't reached because push was enabled for this session
-					-- session.first_hibernated_push is the starting point for hibernation timeouts of those push enabled clients
-					-- wait for an additional resume_timeout seconds if no push occurred since hibernation at all
-					local current_time = os_time();
-					local timeout_start = math_max(session.hibernating, session.first_hibernated_push or session.hibernating);
-					if session.push_identifier ~= nil and not session.first_hibernated_push then
-						session.log("debug", "No push happened since hibernation started, hibernating session for up to %d extra seconds", resume_timeout);
-						return resume_timeout;
-					end
-					if session.push_identifier ~= nil and current_time-timeout_start < resume_timeout then
-						session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds",
-							resume_timeout - (current_time - timeout_start));
-						return resume_timeout-(current_time-timeout_start); -- time left to wait
-					end
-					session.log("debug", "Destroying session for hibernating too long");
-					session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil;
-					-- save only actual h value and username/host (for security)
-					old_session_registry:set(session.username, session.resumption_token, {
-						h = session.handled_stanza_count,
-					});
-					session.resumption_token = nil;
-					sessionmanager.destroy_session(session, "Hibernating too long");
-				else
-					session.log("debug", "Session resumed before hibernation timeout, all is well")
-				end
-			end);
-			if session.conn then
-				session.conn:close();
-			end
-			return true; -- Postpone destruction for now
+		return
+	end
+
+	session.hibernating = os_time();
+	session.hibernating_watchdog = watchdog.new(resume_timeout, function()
+		session.log("debug", "mod_smacks hibernation timeout reached...");
+		if session.destroyed then
+			session.log("debug", "The session has already been destroyed");
+			return
 		end
+
+		session.log("debug", "Destroying session for hibernating too long");
+		session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil;
+		old_session_registry:set(session.username, session.resumption_token, { h = session.handled_stanza_count });
+		session.resumption_token = nil;
+		sessionmanager.destroy_session(session, "Hibernating too long");
+	end);
+	if session.conn then
+		session.conn:close();
 	end
+	module:fire_event("smacks-hibernation-start", { origin = session; queue = session.outgoing_stanza_queue });
+	return true; -- Postpone destruction for now
 end);
 
 local function handle_s2s_destroyed(event)
@@ -520,8 +491,11 @@
 				:tag("item-not-found", { xmlns = xmlns_errors })
 			);
 		end;
-	elseif session.username == original_session.username
-	and session.host == original_session.host then
+	else
+		if original_session.hibernating_watchdog then
+			original_session.hibernating_watchdog:cancel();
+			original_session.hibernating_watchdog = nil;
+		end
 		session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session));
 		original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session));
 		-- TODO: All this should move to sessionmanager (e.g. session:replace(new_session))
@@ -584,12 +558,6 @@
 		end
 		module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue});
 		request_ack_if_needed(original_session, true, "handle_resume", nil);
-	else
-		module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]",
-			session.username or "?", session.host or "?", session.type,
-			original_session.username or "?", original_session.host or "?", original_session.type);
-		session.send(st.stanza("failed", { xmlns = xmlns_sm })
-			:tag("not-authorized", { xmlns = xmlns_errors }));
 	end
 	return true;
 end