Software /
code /
prosody
Comparison
plugins/mod_smacks.lua @ 11937:364c3f018e3a
mod_smacks: Refactor ack requesting to avoid some timer roundtrips
The function was too large to comprehend! Breaking it up helps
readability and reuse.
The timer round rip is only to avoid ordering weirdness when sending
from inside a stanza filter. No need when handling <r> and <a>
CSI interactions both boiled down to sending an <r> immediately.
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Wed, 24 Nov 2021 21:27:49 +0100 |
parent | 11936:3f49c35607ca |
child | 11938:6da703cb4c04 |
comparison
equal
deleted
inserted
replaced
11936:3f49c35607ca | 11937:364c3f018e3a |
---|---|
143 event.features:tag("sm", sm2_attr):tag("optional"):up():up(); | 143 event.features:tag("sm", sm2_attr):tag("optional"):up():up(); |
144 event.features:tag("sm", sm3_attr):tag("optional"):up():up(); | 144 event.features:tag("sm", sm3_attr):tag("optional"):up():up(); |
145 end | 145 end |
146 end); | 146 end); |
147 | 147 |
148 local function request_ack_if_needed(session, force, reason, stanza) | 148 local function should_ack(session, force) |
149 if not session then return end -- shouldn't be possible | |
150 if session.destroyed then return end -- gone | |
151 if not session.smacks then return end -- not using | |
152 if session.hibernating then return end -- can't ack when asleep | |
153 if session.awaiting_ack then return end -- already waiting | |
154 if force then return force end | |
149 local queue = session.outgoing_stanza_queue; | 155 local queue = session.outgoing_stanza_queue; |
150 local expected_h = session.last_acknowledged_stanza + #queue; | 156 local expected_h = session.last_acknowledged_stanza + #queue; |
151 local max_unacked = max_unacked_stanzas; | 157 local max_unacked = max_unacked_stanzas; |
152 if session.state == "inactive" then | 158 if session.state == "inactive" then |
153 max_unacked = max_inactive_unacked_stanzas; | 159 max_unacked = max_inactive_unacked_stanzas; |
154 end | 160 end |
155 if session.awaiting_ack == nil and not session.hibernating then | 161 -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong |
156 -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong | 162 -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any |
157 -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any | 163 -- further requests until a higher h-value would be expected. |
158 -- further requests until a higher h-value would be expected. | 164 return #queue > max_unacked and expected_h ~= session.last_requested_h; |
159 if (#queue > max_unacked and expected_h ~= session.last_requested_h) or force then | 165 end |
160 session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue); | 166 |
161 session.awaiting_ack = false; | 167 local function request_ack(session, reason) |
162 session.awaiting_ack_timer = timer.add_task(1e-06, function () | 168 local queue = session.outgoing_stanza_queue; |
163 -- session.log("debug", "*** SMACKS(3) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating)); | 169 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue); |
164 -- only request ack if needed and our session is not already hibernated or destroyed | 170 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) |
165 if not session.awaiting_ack and not session.hibernating and not session.destroyed then | 171 if session.destroyed then return end -- sending something can trigger destruction |
166 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue); | 172 session.awaiting_ack = true; |
167 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) | 173 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) |
168 if session.destroyed then return end -- sending something can trigger destruction | 174 session.last_requested_h = session.last_acknowledged_stanza + #queue; |
169 session.awaiting_ack = true; | 175 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue); |
170 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) | 176 if not session.delayed_ack_timer then |
171 session.last_requested_h = session.last_acknowledged_stanza + #queue; | 177 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() |
172 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue); | 178 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue |
173 if not session.delayed_ack_timer then | 179 end); |
174 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() | 180 end |
175 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue | 181 end |
176 end); | 182 |
177 end | 183 local function request_ack_now_if_needed(session, force, reason) |
178 end | 184 if should_ack(session, force) then |
179 end); | 185 request_ack(session, reason); |
180 end | 186 end |
187 end | |
188 | |
189 local function request_ack_if_needed(session, force, reason, stanza) | |
190 if should_ack(session, force) then | |
191 timer.add_task(0, function () | |
192 request_ack_now_if_needed(session, force, reason, stanza); | |
193 end); | |
181 end | 194 end |
182 | 195 |
183 -- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue | 196 -- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue |
184 -- and there isn't already a timer for this event running. | 197 -- and there isn't already a timer for this event running. |
185 -- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event | 198 -- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event |
186 -- would not trigger this event (again). | 199 -- would not trigger this event (again). |
200 local queue = session.outgoing_stanza_queue; | |
201 local max_unacked = max_unacked_stanzas; | |
202 if session.state == "inactive" then | |
203 max_unacked = max_inactive_unacked_stanzas; | |
204 end | |
187 if #queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then | 205 if #queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then |
188 session.log("debug", "Calling ack_delayed directly (still waiting for ack)"); | 206 session.log("debug", "Calling ack_delayed directly (still waiting for ack)"); |
189 ack_delayed(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules | 207 ack_delayed(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules |
190 end | 208 end |
191 end | 209 end |
336 end | 354 end |
337 module:log("debug", "Received ack request, acking for %d", origin.handled_stanza_count); | 355 module:log("debug", "Received ack request, acking for %d", origin.handled_stanza_count); |
338 -- Reply with <a> | 356 -- Reply with <a> |
339 (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = string.format("%d", origin.handled_stanza_count) })); | 357 (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = string.format("%d", origin.handled_stanza_count) })); |
340 -- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h) | 358 -- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h) |
341 local expected_h = origin.last_acknowledged_stanza + #origin.outgoing_stanza_queue; | 359 request_ack_now_if_needed(origin, false, "piggybacked by handle_r", nil); |
342 if #origin.outgoing_stanza_queue > 0 and expected_h ~= origin.last_requested_h then | |
343 request_ack_if_needed(origin, true, "piggybacked by handle_r", nil); | |
344 end | |
345 return true; | 360 return true; |
346 end | 361 end |
347 module:hook_tag(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end); | 362 module:hook_tag(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end); |
348 module:hook_tag(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end); | 363 module:hook_tag(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end); |
349 | 364 |
383 module:fire_event("delivery/success", { session = origin, stanza = handled_stanza }); | 398 module:fire_event("delivery/success", { session = origin, stanza = handled_stanza }); |
384 end | 399 end |
385 | 400 |
386 origin.log("debug", "#queue = %d", #queue); | 401 origin.log("debug", "#queue = %d", #queue); |
387 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count; | 402 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count; |
388 request_ack_if_needed(origin, false, "handle_a", nil) | 403 request_ack_now_if_needed(origin, false, "handle_a", nil) |
389 return true; | 404 return true; |
390 end | 405 end |
391 module:hook_tag(xmlns_sm2, "a", handle_a); | 406 module:hook_tag(xmlns_sm2, "a", handle_a); |
392 module:hook_tag(xmlns_sm3, "a", handle_a); | 407 module:hook_tag(xmlns_sm3, "a", handle_a); |
393 | 408 |
630 return true; | 645 return true; |
631 end | 646 end |
632 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); | 647 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); |
633 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); | 648 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); |
634 | 649 |
635 module:hook("csi-client-active", function (event) | 650 -- Events when it's sensible to request an ack |
636 if event.origin.smacks then | 651 -- Could experiment with forcing (ignoring max_unacked) <r>, but when and why? |
637 request_ack_if_needed(event.origin, true, "csi-active", nil); | 652 local request_ack_events = { |
638 end | 653 ["csi-client-active"] = true; |
639 end); | 654 ["csi-flushing"] = false; |
640 | 655 }; |
641 module:hook("csi-flushing", function(event) | 656 |
642 local session = event.session; | 657 for event_name, force in pairs(request_ack_events) do |
643 if session.smacks then | 658 module:log("info", "module:hook(%q, function)"); |
644 if not session.awaiting_ack and not session.hibernating and not session.destroyed then | 659 module:hook(event_name, function(event) |
645 session.log("debug", "Sending <r> (csi-flushing)"); | 660 local session = event.session or event.origin; |
646 session.awaiting_ack = true; -- The send() call may invoke this event again, so set this first | 661 request_ack_now_if_needed(session, force, event_name); |
647 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) | 662 end); |
648 end | 663 end |
649 end | |
650 end); | |
651 | 664 |
652 local function handle_read_timeout(event) | 665 local function handle_read_timeout(event) |
653 local session = event.session; | 666 local session = event.session; |
654 if session.smacks then | 667 if session.smacks then |
655 if session.awaiting_ack then | 668 if session.awaiting_ack then |
661 timer.stop(session.delayed_ack_timer); | 674 timer.stop(session.delayed_ack_timer); |
662 session.delayed_ack_timer = nil; | 675 session.delayed_ack_timer = nil; |
663 end | 676 end |
664 return false; -- Kick the session | 677 return false; -- Kick the session |
665 end | 678 end |
666 session.log("debug", "Sending <r> (read timeout)"); | 679 request_ack_now_if_needed(session, true, "read timeout"); |
667 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })); | |
668 session.awaiting_ack = true; | |
669 if not session.delayed_ack_timer then | |
670 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() | |
671 ack_delayed(session, nil); | |
672 end); | |
673 end | |
674 return true; | 680 return true; |
675 end | 681 end |
676 end | 682 end |
677 | 683 |
678 module:hook("s2s-read-timeout", handle_read_timeout); | 684 module:hook("s2s-read-timeout", handle_read_timeout); |