Comparison

plugins/mod_smacks.lua @ 11937:364c3f018e3a

mod_smacks: Refactor ack requesting to avoid some timer roundtrips The function was too large to comprehend! Breaking it up helps readability and reuse. The timer round rip is only to avoid ordering weirdness when sending from inside a stanza filter. No need when handling <r> and <a> CSI interactions both boiled down to sending an <r> immediately.
author Kim Alvefur <zash@zash.se>
date Wed, 24 Nov 2021 21:27:49 +0100
parent 11936:3f49c35607ca
child 11938:6da703cb4c04
comparison
equal deleted inserted replaced
11936:3f49c35607ca 11937:364c3f018e3a
143 event.features:tag("sm", sm2_attr):tag("optional"):up():up(); 143 event.features:tag("sm", sm2_attr):tag("optional"):up():up();
144 event.features:tag("sm", sm3_attr):tag("optional"):up():up(); 144 event.features:tag("sm", sm3_attr):tag("optional"):up():up();
145 end 145 end
146 end); 146 end);
147 147
148 local function request_ack_if_needed(session, force, reason, stanza) 148 local function should_ack(session, force)
149 if not session then return end -- shouldn't be possible
150 if session.destroyed then return end -- gone
151 if not session.smacks then return end -- not using
152 if session.hibernating then return end -- can't ack when asleep
153 if session.awaiting_ack then return end -- already waiting
154 if force then return force end
149 local queue = session.outgoing_stanza_queue; 155 local queue = session.outgoing_stanza_queue;
150 local expected_h = session.last_acknowledged_stanza + #queue; 156 local expected_h = session.last_acknowledged_stanza + #queue;
151 local max_unacked = max_unacked_stanzas; 157 local max_unacked = max_unacked_stanzas;
152 if session.state == "inactive" then 158 if session.state == "inactive" then
153 max_unacked = max_inactive_unacked_stanzas; 159 max_unacked = max_inactive_unacked_stanzas;
154 end 160 end
155 if session.awaiting_ack == nil and not session.hibernating then 161 -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong
156 -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong 162 -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any
157 -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any 163 -- further requests until a higher h-value would be expected.
158 -- further requests until a higher h-value would be expected. 164 return #queue > max_unacked and expected_h ~= session.last_requested_h;
159 if (#queue > max_unacked and expected_h ~= session.last_requested_h) or force then 165 end
160 session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue); 166
161 session.awaiting_ack = false; 167 local function request_ack(session, reason)
162 session.awaiting_ack_timer = timer.add_task(1e-06, function () 168 local queue = session.outgoing_stanza_queue;
163 -- session.log("debug", "*** SMACKS(3) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating)); 169 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue);
164 -- only request ack if needed and our session is not already hibernated or destroyed 170 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
165 if not session.awaiting_ack and not session.hibernating and not session.destroyed then 171 if session.destroyed then return end -- sending something can trigger destruction
166 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue); 172 session.awaiting_ack = true;
167 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) 173 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
168 if session.destroyed then return end -- sending something can trigger destruction 174 session.last_requested_h = session.last_acknowledged_stanza + #queue;
169 session.awaiting_ack = true; 175 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue);
170 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) 176 if not session.delayed_ack_timer then
171 session.last_requested_h = session.last_acknowledged_stanza + #queue; 177 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
172 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue); 178 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue
173 if not session.delayed_ack_timer then 179 end);
174 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() 180 end
175 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue 181 end
176 end); 182
177 end 183 local function request_ack_now_if_needed(session, force, reason)
178 end 184 if should_ack(session, force) then
179 end); 185 request_ack(session, reason);
180 end 186 end
187 end
188
189 local function request_ack_if_needed(session, force, reason, stanza)
190 if should_ack(session, force) then
191 timer.add_task(0, function ()
192 request_ack_now_if_needed(session, force, reason, stanza);
193 end);
181 end 194 end
182 195
183 -- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue 196 -- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue
184 -- and there isn't already a timer for this event running. 197 -- and there isn't already a timer for this event running.
185 -- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event 198 -- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event
186 -- would not trigger this event (again). 199 -- would not trigger this event (again).
200 local queue = session.outgoing_stanza_queue;
201 local max_unacked = max_unacked_stanzas;
202 if session.state == "inactive" then
203 max_unacked = max_inactive_unacked_stanzas;
204 end
187 if #queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then 205 if #queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then
188 session.log("debug", "Calling ack_delayed directly (still waiting for ack)"); 206 session.log("debug", "Calling ack_delayed directly (still waiting for ack)");
189 ack_delayed(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules 207 ack_delayed(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules
190 end 208 end
191 end 209 end
336 end 354 end
337 module:log("debug", "Received ack request, acking for %d", origin.handled_stanza_count); 355 module:log("debug", "Received ack request, acking for %d", origin.handled_stanza_count);
338 -- Reply with <a> 356 -- Reply with <a>
339 (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = string.format("%d", origin.handled_stanza_count) })); 357 (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = string.format("%d", origin.handled_stanza_count) }));
340 -- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h) 358 -- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h)
341 local expected_h = origin.last_acknowledged_stanza + #origin.outgoing_stanza_queue; 359 request_ack_now_if_needed(origin, false, "piggybacked by handle_r", nil);
342 if #origin.outgoing_stanza_queue > 0 and expected_h ~= origin.last_requested_h then
343 request_ack_if_needed(origin, true, "piggybacked by handle_r", nil);
344 end
345 return true; 360 return true;
346 end 361 end
347 module:hook_tag(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end); 362 module:hook_tag(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end);
348 module:hook_tag(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end); 363 module:hook_tag(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end);
349 364
383 module:fire_event("delivery/success", { session = origin, stanza = handled_stanza }); 398 module:fire_event("delivery/success", { session = origin, stanza = handled_stanza });
384 end 399 end
385 400
386 origin.log("debug", "#queue = %d", #queue); 401 origin.log("debug", "#queue = %d", #queue);
387 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count; 402 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count;
388 request_ack_if_needed(origin, false, "handle_a", nil) 403 request_ack_now_if_needed(origin, false, "handle_a", nil)
389 return true; 404 return true;
390 end 405 end
391 module:hook_tag(xmlns_sm2, "a", handle_a); 406 module:hook_tag(xmlns_sm2, "a", handle_a);
392 module:hook_tag(xmlns_sm3, "a", handle_a); 407 module:hook_tag(xmlns_sm3, "a", handle_a);
393 408
630 return true; 645 return true;
631 end 646 end
632 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); 647 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
633 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); 648 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);
634 649
635 module:hook("csi-client-active", function (event) 650 -- Events when it's sensible to request an ack
636 if event.origin.smacks then 651 -- Could experiment with forcing (ignoring max_unacked) <r>, but when and why?
637 request_ack_if_needed(event.origin, true, "csi-active", nil); 652 local request_ack_events = {
638 end 653 ["csi-client-active"] = true;
639 end); 654 ["csi-flushing"] = false;
640 655 };
641 module:hook("csi-flushing", function(event) 656
642 local session = event.session; 657 for event_name, force in pairs(request_ack_events) do
643 if session.smacks then 658 module:log("info", "module:hook(%q, function)");
644 if not session.awaiting_ack and not session.hibernating and not session.destroyed then 659 module:hook(event_name, function(event)
645 session.log("debug", "Sending <r> (csi-flushing)"); 660 local session = event.session or event.origin;
646 session.awaiting_ack = true; -- The send() call may invoke this event again, so set this first 661 request_ack_now_if_needed(session, force, event_name);
647 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) 662 end);
648 end 663 end
649 end
650 end);
651 664
652 local function handle_read_timeout(event) 665 local function handle_read_timeout(event)
653 local session = event.session; 666 local session = event.session;
654 if session.smacks then 667 if session.smacks then
655 if session.awaiting_ack then 668 if session.awaiting_ack then
661 timer.stop(session.delayed_ack_timer); 674 timer.stop(session.delayed_ack_timer);
662 session.delayed_ack_timer = nil; 675 session.delayed_ack_timer = nil;
663 end 676 end
664 return false; -- Kick the session 677 return false; -- Kick the session
665 end 678 end
666 session.log("debug", "Sending <r> (read timeout)"); 679 request_ack_now_if_needed(session, true, "read timeout");
667 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }));
668 session.awaiting_ack = true;
669 if not session.delayed_ack_timer then
670 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
671 ack_delayed(session, nil);
672 end);
673 end
674 return true; 680 return true;
675 end 681 end
676 end 682 end
677 683
678 module:hook("s2s-read-timeout", handle_read_timeout); 684 module:hook("s2s-read-timeout", handle_read_timeout);