Changeset

11937:364c3f018e3a

mod_smacks: Refactor ack requesting to avoid some timer roundtrips The function was too large to comprehend! Breaking it up helps readability and reuse. The timer round rip is only to avoid ordering weirdness when sending from inside a stanza filter. No need when handling <r> and <a> CSI interactions both boiled down to sending an <r> immediately.
author Kim Alvefur <zash@zash.se>
date Wed, 24 Nov 2021 21:27:49 +0100
parents 11936:3f49c35607ca
children 11938:6da703cb4c04
files plugins/mod_smacks.lua
diffstat 1 files changed, 61 insertions(+), 55 deletions(-) [+]
line wrap: on
line diff
--- a/plugins/mod_smacks.lua	Wed Nov 24 21:27:45 2021 +0100
+++ b/plugins/mod_smacks.lua	Wed Nov 24 21:27:49 2021 +0100
@@ -145,45 +145,63 @@
 			end
 		end);
 
-local function request_ack_if_needed(session, force, reason, stanza)
+local function should_ack(session, force)
+	if not session then return end -- shouldn't be possible
+	if session.destroyed then return end -- gone
+	if not session.smacks then return end -- not using
+	if session.hibernating then return end -- can't ack when asleep
+	if session.awaiting_ack then return end -- already waiting
+	if force then return force end
 	local queue = session.outgoing_stanza_queue;
 	local expected_h = session.last_acknowledged_stanza + #queue;
 	local max_unacked = max_unacked_stanzas;
 	if session.state == "inactive" then
 		max_unacked = max_inactive_unacked_stanzas;
 	end
-	if session.awaiting_ack == nil and not session.hibernating then
-		-- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong
-		-- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any
-		-- further requests until a higher h-value would be expected.
-		if (#queue > max_unacked and expected_h ~= session.last_requested_h) or force then
-			session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue);
-			session.awaiting_ack = false;
-			session.awaiting_ack_timer = timer.add_task(1e-06, function ()
-				-- session.log("debug", "*** SMACKS(3) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating));
-				-- only request ack if needed and our session is not already hibernated or destroyed
-				if not session.awaiting_ack and not session.hibernating and not session.destroyed then
-					session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue);
-					(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
-					if session.destroyed then return end -- sending something can trigger destruction
-					session.awaiting_ack = true;
-					-- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
-					session.last_requested_h = session.last_acknowledged_stanza + #queue;
-					session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue);
-					if not session.delayed_ack_timer then
-						session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
-							ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue
-						end);
-					end
-				end
-			end);
-		end
+	-- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong
+	-- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any
+	-- further requests until a higher h-value would be expected.
+	return #queue > max_unacked and expected_h ~= session.last_requested_h;
+end
+
+local function request_ack(session, reason)
+	local queue = session.outgoing_stanza_queue;
+	session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue);
+	(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
+	if session.destroyed then return end -- sending something can trigger destruction
+	session.awaiting_ack = true;
+	-- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
+	session.last_requested_h = session.last_acknowledged_stanza + #queue;
+	session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue);
+	if not session.delayed_ack_timer then
+		session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
+			ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue
+		end);
+	end
+end
+
+local function request_ack_now_if_needed(session, force, reason)
+	if should_ack(session, force) then
+		request_ack(session, reason);
+	end
+end
+
+local function request_ack_if_needed(session, force, reason, stanza)
+	if should_ack(session, force) then
+		timer.add_task(0, function ()
+			request_ack_now_if_needed(session, force, reason, stanza);
+		end);
 	end
 
 	-- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue
 	-- and there isn't already a timer for this event running.
 	-- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event
 	-- would not trigger this event (again).
+	local queue = session.outgoing_stanza_queue;
+	local max_unacked = max_unacked_stanzas;
+	if session.state == "inactive" then
+		max_unacked = max_inactive_unacked_stanzas;
+	end
 	if #queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then
 		session.log("debug", "Calling ack_delayed directly (still waiting for ack)");
 		ack_delayed(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules
@@ -338,10 +356,7 @@
 	-- Reply with <a>
 	(origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = string.format("%d", origin.handled_stanza_count) }));
 	-- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h)
-	local expected_h = origin.last_acknowledged_stanza + #origin.outgoing_stanza_queue;
-	if #origin.outgoing_stanza_queue > 0 and expected_h ~= origin.last_requested_h then
-		request_ack_if_needed(origin, true, "piggybacked by handle_r", nil);
-	end
+	request_ack_now_if_needed(origin, false, "piggybacked by handle_r", nil);
 	return true;
 end
 module:hook_tag(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end);
@@ -385,7 +400,7 @@
 
 	origin.log("debug", "#queue = %d", #queue);
 	origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count;
-	request_ack_if_needed(origin, false, "handle_a", nil)
+	request_ack_now_if_needed(origin, false, "handle_a", nil)
 	return true;
 end
 module:hook_tag(xmlns_sm2, "a", handle_a);
@@ -632,22 +647,20 @@
 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);
 
-module:hook("csi-client-active", function (event)
-	if event.origin.smacks then
-		request_ack_if_needed(event.origin, true, "csi-active", nil);
-	end
-end);
+-- Events when it's sensible to request an ack
+-- Could experiment with forcing (ignoring max_unacked) <r>, but when and why?
+local request_ack_events = {
+	["csi-client-active"] = true;
+	["csi-flushing"] = false;
+};
 
-module:hook("csi-flushing", function(event)
-	local session = event.session;
-	if session.smacks then
-		if not session.awaiting_ack and not session.hibernating and not session.destroyed then
-			session.log("debug", "Sending <r> (csi-flushing)");
-			session.awaiting_ack = true; -- The send() call may invoke this event again, so set this first
-			(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
-		end
-	end
-end);
+for event_name, force in pairs(request_ack_events) do
+	module:log("info", "module:hook(%q, function)");
+	module:hook(event_name, function(event)
+		local session = event.session or event.origin;
+		request_ack_now_if_needed(session, force, event_name);
+	end);
+end
 
 local function handle_read_timeout(event)
 	local session = event.session;
@@ -663,14 +676,7 @@
 			end
 			return false; -- Kick the session
 		end
-		session.log("debug", "Sending <r> (read timeout)");
-		(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }));
-		session.awaiting_ack = true;
-		if not session.delayed_ack_timer then
-			session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
-				ack_delayed(session, nil);
-			end);
-		end
+		request_ack_now_if_needed(session, true, "read timeout");
 		return true;
 	end
 end