Software /
code /
prosody
Comparison
plugins/mod_smacks.lua @ 12056:e62025f949f9
mod_smacks: Limit queue memory consumption using new util
This brings back the queue size limit that was once added, then removed
because destroying the session when reaching the limit was not great.
Instead, the queue wraps and overwrites the oldest unacked stanza on the
assumption that it will probably be acked anyway and thus does not need
to be delivered. If those discarded stanzas turn out to be needed on
resumption then the resumption fails.
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Tue, 14 Dec 2021 20:00:45 +0100 |
parent | 12054:0116fa57f05c |
child | 12059:70a55fbe447c |
comparison
equal
deleted
inserted
replaced
12055:daced16154fa | 12056:e62025f949f9 |
---|---|
11 -- COPYING file in the source package for more information. | 11 -- COPYING file in the source package for more information. |
12 -- | 12 -- |
13 | 13 |
14 local tonumber = tonumber; | 14 local tonumber = tonumber; |
15 local tostring = tostring; | 15 local tostring = tostring; |
16 local math_min = math.min; | |
17 local os_time = os.time; | 16 local os_time = os.time; |
18 local t_remove = table.remove; | |
19 | 17 |
20 local datetime = require "util.datetime"; | 18 local datetime = require "util.datetime"; |
21 local add_filter = require "util.filters".add_filter; | 19 local add_filter = require "util.filters".add_filter; |
22 local jid = require "util.jid"; | 20 local jid = require "util.jid"; |
21 local smqueue = require "util.smqueue"; | |
23 local st = require "util.stanza"; | 22 local st = require "util.stanza"; |
24 local timer = require "util.timer"; | 23 local timer = require "util.timer"; |
25 local uuid_generate = require "util.uuid".generate; | 24 local uuid_generate = require "util.uuid".generate; |
26 local watchdog = require "util.watchdog"; | 25 local watchdog = require "util.watchdog"; |
27 | 26 |
35 local xmlns_sm3 = "urn:xmpp:sm:3"; | 34 local xmlns_sm3 = "urn:xmpp:sm:3"; |
36 | 35 |
37 local sm2_attr = { xmlns = xmlns_sm2 }; | 36 local sm2_attr = { xmlns = xmlns_sm2 }; |
38 local sm3_attr = { xmlns = xmlns_sm3 }; | 37 local sm3_attr = { xmlns = xmlns_sm3 }; |
39 | 38 |
39 local queue_size = module:get_option_number("smacks_max_queue_size", 500); | |
40 local resume_timeout = module:get_option_number("smacks_hibernation_time", 600); | 40 local resume_timeout = module:get_option_number("smacks_hibernation_time", 600); |
41 local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", true); | 41 local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", true); |
42 local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false); | 42 local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false); |
43 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0); | 43 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0); |
44 local max_inactive_unacked_stanzas = module:get_option_number("smacks_max_inactive_unacked_stanzas", 256); | 44 local max_inactive_unacked_stanzas = module:get_option_number("smacks_max_inactive_unacked_stanzas", 256); |
49 local function format_h(h) if h then return string.format("%d", h) end end | 49 local function format_h(h) if h then return string.format("%d", h) end end |
50 | 50 |
51 local old_session_registry = module:open_store("smacks_h", "map"); | 51 local old_session_registry = module:open_store("smacks_h", "map"); |
52 local session_registry = module:shared "/*/smacks/resumption-tokens"; -- > user@host/resumption-token --> resource | 52 local session_registry = module:shared "/*/smacks/resumption-tokens"; -- > user@host/resumption-token --> resource |
53 | 53 |
54 local ack_errors = require"util.error".init("mod_smacks", xmlns_sm3, { | |
55 head = { condition = "undefined-condition"; text = "Client acknowledged more stanzas than sent by server" }; | |
56 tail = { condition = "undefined-condition"; text = "Client acknowledged less stanzas than already acknowledged" }; | |
57 pop = { condition = "internal-server-error"; text = "Something went wrong with Stream Management" }; | |
58 overflow = { condition = "resource-constraint", text = "Too many unacked stanzas remaining, session can't be resumed" } | |
59 }); | |
60 | |
61 -- COMPAT note the use of compatibilty wrapper in events (queue:table()) | |
62 | |
54 local function ack_delayed(session, stanza) | 63 local function ack_delayed(session, stanza) |
55 -- fire event only if configured to do so and our session is not already hibernated or destroyed | 64 -- fire event only if configured to do so and our session is not already hibernated or destroyed |
56 if delayed_ack_timeout > 0 and session.awaiting_ack | 65 if delayed_ack_timeout > 0 and session.awaiting_ack |
57 and not session.hibernating and not session.destroyed then | 66 and not session.hibernating and not session.destroyed then |
58 session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d", | 67 session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d", |
59 session.outgoing_stanza_queue and #session.outgoing_stanza_queue or 0); | 68 session.outgoing_stanza_queue and session.outgoing_stanza_queue:count_unacked() or 0); |
60 module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue, stanza = stanza}); | 69 module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue:table(), stanza = stanza}); |
61 end | 70 end |
62 session.delayed_ack_timer = nil; | 71 session.delayed_ack_timer = nil; |
63 end | 72 end |
64 | 73 |
65 local function can_do_smacks(session, advertise_only) | 74 local function can_do_smacks(session, advertise_only) |
99 if not session.smacks then return end -- not using | 108 if not session.smacks then return end -- not using |
100 if session.hibernating then return end -- can't ack when asleep | 109 if session.hibernating then return end -- can't ack when asleep |
101 if session.awaiting_ack then return end -- already waiting | 110 if session.awaiting_ack then return end -- already waiting |
102 if force then return force end | 111 if force then return force end |
103 local queue = session.outgoing_stanza_queue; | 112 local queue = session.outgoing_stanza_queue; |
104 local expected_h = session.last_acknowledged_stanza + #queue; | 113 local expected_h = session.last_acknowledged_stanza + queue:count_unacked(); |
105 local max_unacked = max_unacked_stanzas; | 114 local max_unacked = max_unacked_stanzas; |
106 if session.state == "inactive" then | 115 if session.state == "inactive" then |
107 max_unacked = max_inactive_unacked_stanzas; | 116 max_unacked = max_inactive_unacked_stanzas; |
108 end | 117 end |
109 -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong | 118 -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong |
110 -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any | 119 -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any |
111 -- further requests until a higher h-value would be expected. | 120 -- further requests until a higher h-value would be expected. |
112 return #queue > max_unacked and expected_h ~= session.last_requested_h; | 121 return queue:count_unacked() > max_unacked and expected_h ~= session.last_requested_h; |
113 end | 122 end |
114 | 123 |
115 local function request_ack(session, reason) | 124 local function request_ack(session, reason) |
116 local queue = session.outgoing_stanza_queue; | 125 local queue = session.outgoing_stanza_queue; |
117 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue); | 126 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, queue:count_unacked()); |
118 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) | 127 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) |
119 if session.destroyed then return end -- sending something can trigger destruction | 128 if session.destroyed then return end -- sending something can trigger destruction |
120 session.awaiting_ack = true; | 129 session.awaiting_ack = true; |
121 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) | 130 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) |
122 session.last_requested_h = session.last_acknowledged_stanza + #queue; | 131 session.last_requested_h = session.last_acknowledged_stanza + queue:count_unacked(); |
123 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue); | 132 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, queue:count_unacked()); |
124 if not session.delayed_ack_timer then | 133 if not session.delayed_ack_timer then |
125 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() | 134 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() |
126 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue | 135 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue |
127 end); | 136 end); |
128 end | 137 end |
148 local queue = session.outgoing_stanza_queue; | 157 local queue = session.outgoing_stanza_queue; |
149 local max_unacked = max_unacked_stanzas; | 158 local max_unacked = max_unacked_stanzas; |
150 if session.state == "inactive" then | 159 if session.state == "inactive" then |
151 max_unacked = max_inactive_unacked_stanzas; | 160 max_unacked = max_inactive_unacked_stanzas; |
152 end | 161 end |
153 if #queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then | 162 if queue:count_unacked() > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then |
154 session.log("debug", "Calling ack_delayed directly (still waiting for ack)"); | 163 session.log("debug", "Calling ack_delayed directly (still waiting for ack)"); |
155 ack_delayed(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules | 164 ack_delayed(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules |
156 end | 165 end |
157 end | 166 end |
158 | 167 |
176 from = jid.bare(session.full_jid or session.host), | 185 from = jid.bare(session.full_jid or session.host), |
177 stamp = datetime.datetime() | 186 stamp = datetime.datetime() |
178 }); | 187 }); |
179 end | 188 end |
180 | 189 |
181 queue[#queue+1] = cached_stanza; | 190 queue:push(cached_stanza); |
191 | |
182 if session.hibernating then | 192 if session.hibernating then |
183 session.log("debug", "hibernating since %s, stanza queued", datetime.datetime(session.hibernating)); | 193 session.log("debug", "hibernating since %s, stanza queued", datetime.datetime(session.hibernating)); |
184 module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue, stanza = cached_stanza}); | 194 -- FIXME queue implementation changed, anything depending on it being an array will break |
195 module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue:table(), stanza = cached_stanza}); | |
185 return nil; | 196 return nil; |
186 end | 197 end |
187 end | 198 end |
188 return stanza; | 199 return stanza; |
189 end | 200 end |
196 return stanza; | 207 return stanza; |
197 end | 208 end |
198 | 209 |
199 local function wrap_session_out(session, resume) | 210 local function wrap_session_out(session, resume) |
200 if not resume then | 211 if not resume then |
201 session.outgoing_stanza_queue = {}; | 212 session.outgoing_stanza_queue = smqueue.new(queue_size); |
202 session.last_acknowledged_stanza = 0; | 213 session.last_acknowledged_stanza = 0; |
203 end | 214 end |
204 | 215 |
205 add_filter(session, "stanzas/out", outgoing_stanza_filter, -999); | 216 add_filter(session, "stanzas/out", outgoing_stanza_filter, -999); |
206 | 217 |
322 if origin.delayed_ack_timer then | 333 if origin.delayed_ack_timer then |
323 timer.stop(origin.delayed_ack_timer) | 334 timer.stop(origin.delayed_ack_timer) |
324 origin.delayed_ack_timer = nil; | 335 origin.delayed_ack_timer = nil; |
325 end | 336 end |
326 -- Remove handled stanzas from outgoing_stanza_queue | 337 -- Remove handled stanzas from outgoing_stanza_queue |
327 -- origin.log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or ""); | |
328 local h = tonumber(stanza.attr.h); | 338 local h = tonumber(stanza.attr.h); |
329 if not h then | 339 if not h then |
330 origin:close{ condition = "invalid-xml"; text = "Missing or invalid 'h' attribute"; }; | 340 origin:close{ condition = "invalid-xml"; text = "Missing or invalid 'h' attribute"; }; |
331 return; | 341 return; |
332 end | 342 end |
333 local handled_stanza_count = h-origin.last_acknowledged_stanza; | |
334 local queue = origin.outgoing_stanza_queue; | 343 local queue = origin.outgoing_stanza_queue; |
335 if handled_stanza_count > #queue then | 344 local handled_stanza_count = h-queue:count_acked(); |
336 origin.log("warn", "The client says it handled %d new stanzas, but we only sent %d :)", | 345 local acked, err = ack_errors.coerce(queue:ack(h)); -- luacheck: ignore 211/acked |
337 handled_stanza_count, #queue); | 346 if err then |
338 origin.log("debug", "Client h: %d, our h: %d", tonumber(stanza.attr.h), origin.last_acknowledged_stanza); | 347 origin.log("warn", "The client says it handled %d new stanzas, but we sent %d :)", |
339 for i=1,#queue do | 348 handled_stanza_count, queue:count_unacked()); |
340 origin.log("debug", "Q item %d: %s", i, tostring(queue[i])); | 349 origin.log("debug", "Client h: %d, our h: %d", tonumber(stanza.attr.h), queue:count_acked()); |
341 end | 350 for i, item in queue._queue:items() do |
342 origin:close{ condition = "undefined-condition"; text = "Client acknowledged more stanzas than sent by server"; }; | 351 origin.log("debug", "Q item %d: %s", i, item); |
352 end | |
353 origin:close(err); | |
343 return; | 354 return; |
344 end | 355 end |
345 | 356 |
346 for _=1,math_min(handled_stanza_count,#queue) do | 357 origin.log("debug", "#queue = %d", queue:count_unacked()); |
347 t_remove(origin.outgoing_stanza_queue, 1); | |
348 end | |
349 | |
350 origin.log("debug", "#queue = %d", #queue); | |
351 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count; | |
352 request_ack_now_if_needed(origin, false, "handle_a", nil) | 358 request_ack_now_if_needed(origin, false, "handle_a", nil) |
353 return true; | 359 return true; |
354 end | 360 end |
355 module:hook_tag(xmlns_sm2, "a", handle_a); | 361 module:hook_tag(xmlns_sm2, "a", handle_a); |
356 module:hook_tag(xmlns_sm3, "a", handle_a); | 362 module:hook_tag(xmlns_sm3, "a", handle_a); |
357 | 363 |
358 local function handle_unacked_stanzas(session) | 364 local function handle_unacked_stanzas(session) |
359 local queue = session.outgoing_stanza_queue; | 365 local queue = session.outgoing_stanza_queue; |
360 if #queue > 0 then | 366 if queue:count_unacked() > 0 then |
361 session.outgoing_stanza_queue = {}; | 367 session.smacks = false; -- Disable queueing |
362 for i=1,#queue do | 368 session.outgoing_stanza_queue = nil; |
363 if not module:fire_event("delivery/failure", { session = session, stanza = queue[i] }) then | 369 for stanza in queue._queue:consume() do |
364 if queue[i].attr.type ~= "error" and queue[i].attr.from ~= session.full_jid then | 370 if not module:fire_event("delivery/failure", { session = session, stanza = stanza }) then |
365 local reply = st.error_reply(queue[i], "cancel", "recipient-unavailable"); | 371 if stanza.attr.type ~= "error" and stanza.attr.to ~= session.full_jid then |
372 local reply = st.error_reply(stanza, "cancel", "recipient-unavailable"); | |
366 core_process_stanza(session, reply); | 373 core_process_stanza(session, reply); |
367 end | 374 end |
368 end | 375 end |
369 end | 376 end |
370 end | 377 end |
414 module:hook("pre-resource-unbind", function (event) | 421 module:hook("pre-resource-unbind", function (event) |
415 local session = event.session; | 422 local session = event.session; |
416 if not session.smacks then return end | 423 if not session.smacks then return end |
417 if not session.resumption_token then | 424 if not session.resumption_token then |
418 local queue = session.outgoing_stanza_queue; | 425 local queue = session.outgoing_stanza_queue; |
419 if #queue > 0 then | 426 if queue:count_unacked() > 0 then |
420 session.log("debug", "Destroying session with %d unacked stanzas", #queue); | 427 session.log("debug", "Destroying session with %d unacked stanzas", queue:count_unacked()); |
421 handle_unacked_stanzas(session); | 428 handle_unacked_stanzas(session); |
422 end | 429 end |
423 return | 430 return |
424 end | 431 end |
425 | 432 |
438 sessionmanager.destroy_session(session, "Hibernating too long"); | 445 sessionmanager.destroy_session(session, "Hibernating too long"); |
439 end); | 446 end); |
440 if session.conn then | 447 if session.conn then |
441 session.conn:close(); | 448 session.conn:close(); |
442 end | 449 end |
443 module:fire_event("smacks-hibernation-start", { origin = session; queue = session.outgoing_stanza_queue }); | 450 module:fire_event("smacks-hibernation-start", { origin = session; queue = session.outgoing_stanza_queue:table() }); |
444 return true; -- Postpone destruction for now | 451 return true; -- Postpone destruction for now |
445 end); | 452 end); |
446 | 453 |
447 local function handle_s2s_destroyed(event) | 454 local function handle_s2s_destroyed(event) |
448 local session = event.session; | 455 local session = event.session; |
449 local queue = session.outgoing_stanza_queue; | 456 local queue = session.outgoing_stanza_queue; |
450 if queue and #queue > 0 then | 457 if queue and queue:count_unacked() > 0 then |
451 session.log("warn", "Destroying session with %d unacked stanzas", #queue); | 458 session.log("warn", "Destroying session with %d unacked stanzas", queue:count_unacked()); |
452 if s2s_resend then | 459 if s2s_resend then |
453 for i = 1, #queue do | 460 for stanza in queue:consume() do |
454 module:send(queue[i]); | 461 module:send(stanza); |
455 end | 462 end |
456 session.outgoing_stanza_queue = nil; | 463 session.outgoing_stanza_queue = nil; |
457 else | 464 else |
458 handle_unacked_stanzas(session); | 465 handle_unacked_stanzas(session); |
459 end | 466 end |
503 original_session.log("debug", "mod_smacks closing an old connection for this session"); | 510 original_session.log("debug", "mod_smacks closing an old connection for this session"); |
504 local conn = original_session.conn; | 511 local conn = original_session.conn; |
505 c2s_sessions[conn] = nil; | 512 c2s_sessions[conn] = nil; |
506 conn:close(); | 513 conn:close(); |
507 end | 514 end |
515 | |
508 local migrated_session_log = session.log; | 516 local migrated_session_log = session.log; |
509 original_session.ip = session.ip; | 517 original_session.ip = session.ip; |
510 original_session.conn = session.conn; | 518 original_session.conn = session.conn; |
511 original_session.rawsend = session.rawsend; | 519 original_session.rawsend = session.rawsend; |
512 original_session.rawsend.session = original_session; | 520 original_session.rawsend.session = original_session; |
528 -- Inform xmppstream of the new session (passed to its callbacks) | 536 -- Inform xmppstream of the new session (passed to its callbacks) |
529 original_session.stream:set_session(original_session); | 537 original_session.stream:set_session(original_session); |
530 -- Similar for connlisteners | 538 -- Similar for connlisteners |
531 c2s_sessions[session.conn] = original_session; | 539 c2s_sessions[session.conn] = original_session; |
532 | 540 |
541 local queue = original_session.outgoing_stanza_queue; | |
542 local h = tonumber(stanza.attr.h); | |
543 | |
544 original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked()) | |
545 local acked, err = ack_errors.coerce(queue:ack(h)); -- luacheck: ignore 211/acked | |
546 | |
547 if not err and not queue:resumable() then | |
548 err = ack_errors.new("overflow"); | |
549 end | |
550 | |
551 if err or not queue:resumable() then | |
552 original_session.send(st.stanza("failed", | |
553 { xmlns = xmlns_sm; h = format_h(original_session.handled_stanza_count); previd = id })); | |
554 original_session:close(err); | |
555 return false; | |
556 end | |
557 | |
533 original_session.send(st.stanza("resumed", { xmlns = xmlns_sm, | 558 original_session.send(st.stanza("resumed", { xmlns = xmlns_sm, |
534 h = format_h(original_session.handled_stanza_count), previd = id })); | 559 h = format_h(original_session.handled_stanza_count), previd = id })); |
535 | |
536 -- Fake an <a> with the h of the <resume/> from the client | |
537 original_session:dispatch_stanza(st.stanza("a", { xmlns = xmlns_sm, | |
538 h = stanza.attr.h })); | |
539 | 560 |
540 -- Ok, we need to re-send any stanzas that the client didn't see | 561 -- Ok, we need to re-send any stanzas that the client didn't see |
541 -- ...they are what is now left in the outgoing stanza queue | 562 -- ...they are what is now left in the outgoing stanza queue |
542 -- We have to use the send of "session" because we don't want to add our resent stanzas | 563 -- We have to use the send of "session" because we don't want to add our resent stanzas |
543 -- to the outgoing queue again | 564 -- to the outgoing queue again |
544 local queue = original_session.outgoing_stanza_queue; | 565 |
545 session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", #queue); | 566 session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked()); |
546 -- FIXME Which session is it that the queue filter sees? | 567 -- FIXME Which session is it that the queue filter sees? |
547 session.resending_unacked = true; | 568 session.resending_unacked = true; |
548 original_session.resending_unacked = true; | 569 original_session.resending_unacked = true; |
549 for i=1,#queue do | 570 for _, queued_stanza in queue:resume() do |
550 session.send(queue[i]); | 571 session.send(queued_stanza); |
551 end | 572 end |
552 session.resending_unacked = nil; | 573 session.resending_unacked = nil; |
553 original_session.resending_unacked = nil; | 574 original_session.resending_unacked = nil; |
554 session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", #queue); | 575 session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", queue:count_unacked()); |
555 function session.send(stanza) -- luacheck: ignore 432 | 576 function session.send(stanza) -- luacheck: ignore 432 |
556 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); | 577 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); |
557 return false; | 578 return false; |
558 end | 579 end |
559 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue}); | 580 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()}); |
560 request_ack_if_needed(original_session, true, "handle_resume", nil); | 581 request_ack_if_needed(original_session, true, "handle_resume", nil); |
561 end | 582 end |
562 return true; | 583 return true; |
563 end | 584 end |
564 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); | 585 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); |