Comparison

plugins/mod_smacks.lua @ 11936:3f49c35607ca

mod_smacks: Fix timer lifetimes to follow session instead of module module:add_timer() creates a timer that stops working after the module is reloaded or unloaded, in order to prevent leaks. However, when the timers control vital session behavior, breakage occurs after reload. E.g. sessions would stop requesting acks and stop responding to acks.
author Kim Alvefur <zash@zash.se>
date Wed, 24 Nov 2021 21:27:45 +0100
parent 11935:4d0d10fabb82
child 11937:364c3f018e3a
comparison
equal deleted inserted replaced
11935:4d0d10fabb82 11936:3f49c35607ca
157 -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any 157 -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any
158 -- further requests until a higher h-value would be expected. 158 -- further requests until a higher h-value would be expected.
159 if (#queue > max_unacked and expected_h ~= session.last_requested_h) or force then 159 if (#queue > max_unacked and expected_h ~= session.last_requested_h) or force then
160 session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue); 160 session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue);
161 session.awaiting_ack = false; 161 session.awaiting_ack = false;
162 session.awaiting_ack_timer = module:add_timer(1e-06, function () 162 session.awaiting_ack_timer = timer.add_task(1e-06, function ()
163 -- session.log("debug", "*** SMACKS(3) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating)); 163 -- session.log("debug", "*** SMACKS(3) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating));
164 -- only request ack if needed and our session is not already hibernated or destroyed 164 -- only request ack if needed and our session is not already hibernated or destroyed
165 if not session.awaiting_ack and not session.hibernating and not session.destroyed then 165 if not session.awaiting_ack and not session.hibernating and not session.destroyed then
166 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue); 166 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue);
167 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) 167 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
169 session.awaiting_ack = true; 169 session.awaiting_ack = true;
170 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) 170 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
171 session.last_requested_h = session.last_acknowledged_stanza + #queue; 171 session.last_requested_h = session.last_acknowledged_stanza + #queue;
172 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue); 172 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue);
173 if not session.delayed_ack_timer then 173 if not session.delayed_ack_timer then
174 session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function() 174 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
175 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue 175 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue
176 end); 176 end);
177 end 177 end
178 end 178 end
179 end); 179 end);
296 module:hook_tag(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100); 296 module:hook_tag(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100);
297 module:hook_tag(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100); 297 module:hook_tag(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100);
298 298
299 module:hook_tag("http://etherx.jabber.org/streams", "features", 299 module:hook_tag("http://etherx.jabber.org/streams", "features",
300 function (session, stanza) 300 function (session, stanza)
301 module:add_timer(1e-6, function () 301 -- Needs to be done after flushing sendq since those aren't stored as
302 -- stanzas and counting them is weird.
303 timer.add_task(1e-6, function ()
302 if can_do_smacks(session) then 304 if can_do_smacks(session) then
303 if stanza:get_child("sm", xmlns_sm3) then 305 if stanza:get_child("sm", xmlns_sm3) then
304 session.sends2s(st.stanza("enable", sm3_attr)); 306 session.sends2s(st.stanza("enable", sm3_attr));
305 session.smacks = xmlns_sm3; 307 session.smacks = xmlns_sm3;
306 elseif stanza:get_child("sm", xmlns_sm2) then 308 elseif stanza:get_child("sm", xmlns_sm2) then
347 349
348 function handle_a(origin, stanza) 350 function handle_a(origin, stanza)
349 if not origin.smacks then return; end 351 if not origin.smacks then return; end
350 origin.awaiting_ack = nil; 352 origin.awaiting_ack = nil;
351 if origin.awaiting_ack_timer then 353 if origin.awaiting_ack_timer then
352 origin.awaiting_ack_timer:stop(); 354 timer.stop(origin.awaiting_ack_timer);
355 origin.awaiting_ack_timer = nil;
353 end 356 end
354 if origin.delayed_ack_timer then 357 if origin.delayed_ack_timer then
355 origin.delayed_ack_timer:stop(); 358 timer.stop(origin.delayed_ack_timer)
356 origin.delayed_ack_timer = nil; 359 origin.delayed_ack_timer = nil;
357 end 360 end
358 -- Remove handled stanzas from outgoing_stanza_queue 361 -- Remove handled stanzas from outgoing_stanza_queue
359 -- origin.log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or ""); 362 -- origin.log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or "");
360 local h = tonumber(stanza.attr.h); 363 local h = tonumber(stanza.attr.h);
649 local function handle_read_timeout(event) 652 local function handle_read_timeout(event)
650 local session = event.session; 653 local session = event.session;
651 if session.smacks then 654 if session.smacks then
652 if session.awaiting_ack then 655 if session.awaiting_ack then
653 if session.awaiting_ack_timer then 656 if session.awaiting_ack_timer then
654 session.awaiting_ack_timer:stop(); 657 timer.stop(session.awaiting_ack_timer);
658 session.awaiting_ack_timer = nil;
655 end 659 end
656 if session.delayed_ack_timer then 660 if session.delayed_ack_timer then
657 session.delayed_ack_timer:stop(); 661 timer.stop(session.delayed_ack_timer);
658 session.delayed_ack_timer = nil; 662 session.delayed_ack_timer = nil;
659 end 663 end
660 return false; -- Kick the session 664 return false; -- Kick the session
661 end 665 end
662 session.log("debug", "Sending <r> (read timeout)"); 666 session.log("debug", "Sending <r> (read timeout)");
663 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })); 667 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }));
664 session.awaiting_ack = true; 668 session.awaiting_ack = true;
665 if not session.delayed_ack_timer then 669 if not session.delayed_ack_timer then
666 session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function() 670 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
667 ack_delayed(session, nil); 671 ack_delayed(session, nil);
668 end); 672 end);
669 end 673 end
670 return true; 674 return true;
671 end 675 end