Software /
code /
prosody
Comparison
plugins/mod_smacks.lua @ 11936:3f49c35607ca
mod_smacks: Fix timer lifetimes to follow session instead of module
module:add_timer() creates a timer that stops working after the module
is reloaded or unloaded, in order to prevent leaks. However, when the
timers control vital session behavior, breakage occurs after reload.
E.g. sessions would stop requesting acks and stop responding to acks.
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Wed, 24 Nov 2021 21:27:45 +0100 |
parent | 11935:4d0d10fabb82 |
child | 11937:364c3f018e3a |
comparison
equal
deleted
inserted
replaced
11935:4d0d10fabb82 | 11936:3f49c35607ca |
---|---|
157 -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any | 157 -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any |
158 -- further requests until a higher h-value would be expected. | 158 -- further requests until a higher h-value would be expected. |
159 if (#queue > max_unacked and expected_h ~= session.last_requested_h) or force then | 159 if (#queue > max_unacked and expected_h ~= session.last_requested_h) or force then |
160 session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue); | 160 session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue); |
161 session.awaiting_ack = false; | 161 session.awaiting_ack = false; |
162 session.awaiting_ack_timer = module:add_timer(1e-06, function () | 162 session.awaiting_ack_timer = timer.add_task(1e-06, function () |
163 -- session.log("debug", "*** SMACKS(3) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating)); | 163 -- session.log("debug", "*** SMACKS(3) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating)); |
164 -- only request ack if needed and our session is not already hibernated or destroyed | 164 -- only request ack if needed and our session is not already hibernated or destroyed |
165 if not session.awaiting_ack and not session.hibernating and not session.destroyed then | 165 if not session.awaiting_ack and not session.hibernating and not session.destroyed then |
166 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue); | 166 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue); |
167 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) | 167 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) |
169 session.awaiting_ack = true; | 169 session.awaiting_ack = true; |
170 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) | 170 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) |
171 session.last_requested_h = session.last_acknowledged_stanza + #queue; | 171 session.last_requested_h = session.last_acknowledged_stanza + #queue; |
172 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue); | 172 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue); |
173 if not session.delayed_ack_timer then | 173 if not session.delayed_ack_timer then |
174 session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function() | 174 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() |
175 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue | 175 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue |
176 end); | 176 end); |
177 end | 177 end |
178 end | 178 end |
179 end); | 179 end); |
296 module:hook_tag(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100); | 296 module:hook_tag(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100); |
297 module:hook_tag(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100); | 297 module:hook_tag(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100); |
298 | 298 |
299 module:hook_tag("http://etherx.jabber.org/streams", "features", | 299 module:hook_tag("http://etherx.jabber.org/streams", "features", |
300 function (session, stanza) | 300 function (session, stanza) |
301 module:add_timer(1e-6, function () | 301 -- Needs to be done after flushing sendq since those aren't stored as |
302 -- stanzas and counting them is weird. | |
303 timer.add_task(1e-6, function () | |
302 if can_do_smacks(session) then | 304 if can_do_smacks(session) then |
303 if stanza:get_child("sm", xmlns_sm3) then | 305 if stanza:get_child("sm", xmlns_sm3) then |
304 session.sends2s(st.stanza("enable", sm3_attr)); | 306 session.sends2s(st.stanza("enable", sm3_attr)); |
305 session.smacks = xmlns_sm3; | 307 session.smacks = xmlns_sm3; |
306 elseif stanza:get_child("sm", xmlns_sm2) then | 308 elseif stanza:get_child("sm", xmlns_sm2) then |
347 | 349 |
348 function handle_a(origin, stanza) | 350 function handle_a(origin, stanza) |
349 if not origin.smacks then return; end | 351 if not origin.smacks then return; end |
350 origin.awaiting_ack = nil; | 352 origin.awaiting_ack = nil; |
351 if origin.awaiting_ack_timer then | 353 if origin.awaiting_ack_timer then |
352 origin.awaiting_ack_timer:stop(); | 354 timer.stop(origin.awaiting_ack_timer); |
355 origin.awaiting_ack_timer = nil; | |
353 end | 356 end |
354 if origin.delayed_ack_timer then | 357 if origin.delayed_ack_timer then |
355 origin.delayed_ack_timer:stop(); | 358 timer.stop(origin.delayed_ack_timer) |
356 origin.delayed_ack_timer = nil; | 359 origin.delayed_ack_timer = nil; |
357 end | 360 end |
358 -- Remove handled stanzas from outgoing_stanza_queue | 361 -- Remove handled stanzas from outgoing_stanza_queue |
359 -- origin.log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or ""); | 362 -- origin.log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or ""); |
360 local h = tonumber(stanza.attr.h); | 363 local h = tonumber(stanza.attr.h); |
649 local function handle_read_timeout(event) | 652 local function handle_read_timeout(event) |
650 local session = event.session; | 653 local session = event.session; |
651 if session.smacks then | 654 if session.smacks then |
652 if session.awaiting_ack then | 655 if session.awaiting_ack then |
653 if session.awaiting_ack_timer then | 656 if session.awaiting_ack_timer then |
654 session.awaiting_ack_timer:stop(); | 657 timer.stop(session.awaiting_ack_timer); |
658 session.awaiting_ack_timer = nil; | |
655 end | 659 end |
656 if session.delayed_ack_timer then | 660 if session.delayed_ack_timer then |
657 session.delayed_ack_timer:stop(); | 661 timer.stop(session.delayed_ack_timer); |
658 session.delayed_ack_timer = nil; | 662 session.delayed_ack_timer = nil; |
659 end | 663 end |
660 return false; -- Kick the session | 664 return false; -- Kick the session |
661 end | 665 end |
662 session.log("debug", "Sending <r> (read timeout)"); | 666 session.log("debug", "Sending <r> (read timeout)"); |
663 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })); | 667 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })); |
664 session.awaiting_ack = true; | 668 session.awaiting_ack = true; |
665 if not session.delayed_ack_timer then | 669 if not session.delayed_ack_timer then |
666 session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function() | 670 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() |
667 ack_delayed(session, nil); | 671 ack_delayed(session, nil); |
668 end); | 672 end); |
669 end | 673 end |
670 return true; | 674 return true; |
671 end | 675 end |