Comparison

mod_smacks/mod_smacks.lua @ 4413:0b9501f82e63

mod_smacks: allow O(1) processing of delayed ack events This adds a stanza field to the eent if the stanza which triggered this event is known exactly.
author tmolitor <thilo@eightysoft.de>
date Sat, 30 Jan 2021 07:19:35 +0100
parent 4189:22e7b3d6fcae
child 4442:74da3643c62d
comparison
equal deleted inserted replaced
4412:e5493a10c4d1 4413:0b9501f82e63
114 stop = function(self) stopped = true end; 114 stop = function(self) stopped = true end;
115 timer; 115 timer;
116 }; 116 };
117 end 117 end
118 118
119 local function delayed_ack_function(session) 119 local function delayed_ack_function(session, stanza)
120 -- fire event only if configured to do so and our session is not already hibernated or destroyed 120 -- fire event only if configured to do so and our session is not already hibernated or destroyed
121 if delayed_ack_timeout > 0 and session.awaiting_ack 121 if delayed_ack_timeout > 0 and session.awaiting_ack
122 and not session.hibernating and not session.destroyed then 122 and not session.hibernating and not session.destroyed then
123 session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d", 123 session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d",
124 session.outgoing_stanza_queue and #session.outgoing_stanza_queue or 0); 124 session.outgoing_stanza_queue and #session.outgoing_stanza_queue or 0);
125 module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue}); 125 module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue, stanza = stanza});
126 end 126 end
127 session.delayed_ack_timer = nil; 127 session.delayed_ack_timer = nil;
128 end 128 end
129 129
130 local function can_do_smacks(session, advertise_only) 130 local function can_do_smacks(session, advertise_only)
156 event.features:tag("sm", sm2_attr):tag("optional"):up():up(); 156 event.features:tag("sm", sm2_attr):tag("optional"):up():up();
157 event.features:tag("sm", sm3_attr):tag("optional"):up():up(); 157 event.features:tag("sm", sm3_attr):tag("optional"):up():up();
158 end 158 end
159 end); 159 end);
160 160
161 local function request_ack_if_needed(session, force, reason) 161 local function request_ack_if_needed(session, force, reason, stanza)
162 local queue = session.outgoing_stanza_queue; 162 local queue = session.outgoing_stanza_queue;
163 local expected_h = session.last_acknowledged_stanza + #queue; 163 local expected_h = session.last_acknowledged_stanza + #queue;
164 -- session.log("debug", "*** SMACKS(1) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating)); 164 -- session.log("debug", "*** SMACKS(1) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating));
165 if session.awaiting_ack == nil and not session.hibernating then 165 if session.awaiting_ack == nil and not session.hibernating then
166 -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong 166 -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong
180 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) 180 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
181 session.last_requested_h = session.last_acknowledged_stanza + #queue; 181 session.last_requested_h = session.last_acknowledged_stanza + #queue;
182 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue); 182 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue);
183 if not session.delayed_ack_timer then 183 if not session.delayed_ack_timer then
184 session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() 184 session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function()
185 delayed_ack_function(session); 185 delayed_ack_function(session, nil); -- we don't know if this is the only new stanza in the queue
186 end); 186 end);
187 end 187 end
188 end 188 end
189 end); 189 end);
190 end 190 end
194 -- and there isn't already a timer for this event running. 194 -- and there isn't already a timer for this event running.
195 -- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event 195 -- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event
196 -- would not trigger this event (again). 196 -- would not trigger this event (again).
197 if #queue > max_unacked_stanzas and session.awaiting_ack and session.delayed_ack_timer == nil then 197 if #queue > max_unacked_stanzas and session.awaiting_ack and session.delayed_ack_timer == nil then
198 session.log("debug", "Calling delayed_ack_function directly (still waiting for ack)"); 198 session.log("debug", "Calling delayed_ack_function directly (still waiting for ack)");
199 delayed_ack_function(session); 199 delayed_ack_function(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules
200 end 200 end
201 end 201 end
202 202
203 local function outgoing_stanza_filter(stanza, session) 203 local function outgoing_stanza_filter(stanza, session)
204 -- XXX: Normally you wouldn't have to check the xmlns for a stanza as it's 204 -- XXX: Normally you wouldn't have to check the xmlns for a stanza as it's
226 if session.hibernating then 226 if session.hibernating then
227 session.log("debug", "hibernating, stanza queued"); 227 session.log("debug", "hibernating, stanza queued");
228 module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue, stanza = cached_stanza}); 228 module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue, stanza = cached_stanza});
229 return nil; 229 return nil;
230 end 230 end
231 request_ack_if_needed(session, false, "outgoing_stanza_filter"); 231 request_ack_if_needed(session, false, "outgoing_stanza_filter", stanza);
232 end 232 end
233 return stanza; 233 return stanza;
234 end 234 end
235 235
236 local function count_incoming_stanzas(stanza, session) 236 local function count_incoming_stanzas(stanza, session)
346 -- Reply with <a> 346 -- Reply with <a>
347 (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = string.format("%d", origin.handled_stanza_count) })); 347 (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = string.format("%d", origin.handled_stanza_count) }));
348 -- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h) 348 -- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h)
349 local expected_h = origin.last_acknowledged_stanza + #origin.outgoing_stanza_queue; 349 local expected_h = origin.last_acknowledged_stanza + #origin.outgoing_stanza_queue;
350 if #origin.outgoing_stanza_queue > 0 and expected_h ~= origin.last_requested_h then 350 if #origin.outgoing_stanza_queue > 0 and expected_h ~= origin.last_requested_h then
351 request_ack_if_needed(origin, true, "piggybacked by handle_r"); 351 request_ack_if_needed(origin, true, "piggybacked by handle_r", nil);
352 end 352 end
353 return true; 353 return true;
354 end 354 end
355 module:hook_stanza(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end); 355 module:hook_stanza(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end);
356 module:hook_stanza(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end); 356 module:hook_stanza(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end);
388 module:fire_event("delivery/success", { session = origin, stanza = handled_stanza }); 388 module:fire_event("delivery/success", { session = origin, stanza = handled_stanza });
389 end 389 end
390 390
391 origin.log("debug", "#queue = %d", #queue); 391 origin.log("debug", "#queue = %d", #queue);
392 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count; 392 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count;
393 request_ack_if_needed(origin, false, "handle_a") 393 request_ack_if_needed(origin, false, "handle_a", nil)
394 return true; 394 return true;
395 end 395 end
396 module:hook_stanza(xmlns_sm2, "a", handle_a); 396 module:hook_stanza(xmlns_sm2, "a", handle_a);
397 module:hook_stanza(xmlns_sm3, "a", handle_a); 397 module:hook_stanza(xmlns_sm3, "a", handle_a);
398 398
621 function session.send(stanza) 621 function session.send(stanza)
622 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); 622 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza));
623 return false; 623 return false;
624 end 624 end
625 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue}); 625 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue});
626 request_ack_if_needed(original_session, true, "handle_resume"); 626 request_ack_if_needed(original_session, true, "handle_resume", nil);
627 else 627 else
628 module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]", 628 module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]",
629 session.username or "?", session.host or "?", session.type, 629 session.username or "?", session.host or "?", session.type,
630 original_session.username or "?", original_session.host or "?", original_session.type); 630 original_session.username or "?", original_session.host or "?", original_session.type);
631 session.send(st.stanza("failed", { xmlns = xmlns_sm }) 631 session.send(st.stanza("failed", { xmlns = xmlns_sm })
652 session.log("debug", "Sending <r> (read timeout)"); 652 session.log("debug", "Sending <r> (read timeout)");
653 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })); 653 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }));
654 session.awaiting_ack = true; 654 session.awaiting_ack = true;
655 if not session.delayed_ack_timer then 655 if not session.delayed_ack_timer then
656 session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() 656 session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function()
657 delayed_ack_function(session); 657 delayed_ack_function(session, nil);
658 end); 658 end);
659 end 659 end
660 return true; 660 return true;
661 end 661 end
662 end 662 end