Comparison

plugins/mod_smacks.lua @ 12056:e62025f949f9

mod_smacks: Limit queue memory consumption using new util This brings back the queue size limit that was once added, then removed because destroying the session when reaching the limit was not great. Instead, the queue wraps and overwrites the oldest unacked stanza on the assumption that it will probably be acked anyway and thus does not need to be delivered. If those discarded stanzas turn out to be needed on resumption then the resumption fails.
author Kim Alvefur <zash@zash.se>
date Tue, 14 Dec 2021 20:00:45 +0100
parent 12054:0116fa57f05c
child 12059:70a55fbe447c
comparison
equal deleted inserted replaced
12055:daced16154fa 12056:e62025f949f9
11 -- COPYING file in the source package for more information. 11 -- COPYING file in the source package for more information.
12 -- 12 --
13 13
14 local tonumber = tonumber; 14 local tonumber = tonumber;
15 local tostring = tostring; 15 local tostring = tostring;
16 local math_min = math.min;
17 local os_time = os.time; 16 local os_time = os.time;
18 local t_remove = table.remove;
19 17
20 local datetime = require "util.datetime"; 18 local datetime = require "util.datetime";
21 local add_filter = require "util.filters".add_filter; 19 local add_filter = require "util.filters".add_filter;
22 local jid = require "util.jid"; 20 local jid = require "util.jid";
21 local smqueue = require "util.smqueue";
23 local st = require "util.stanza"; 22 local st = require "util.stanza";
24 local timer = require "util.timer"; 23 local timer = require "util.timer";
25 local uuid_generate = require "util.uuid".generate; 24 local uuid_generate = require "util.uuid".generate;
26 local watchdog = require "util.watchdog"; 25 local watchdog = require "util.watchdog";
27 26
35 local xmlns_sm3 = "urn:xmpp:sm:3"; 34 local xmlns_sm3 = "urn:xmpp:sm:3";
36 35
37 local sm2_attr = { xmlns = xmlns_sm2 }; 36 local sm2_attr = { xmlns = xmlns_sm2 };
38 local sm3_attr = { xmlns = xmlns_sm3 }; 37 local sm3_attr = { xmlns = xmlns_sm3 };
39 38
39 local queue_size = module:get_option_number("smacks_max_queue_size", 500);
40 local resume_timeout = module:get_option_number("smacks_hibernation_time", 600); 40 local resume_timeout = module:get_option_number("smacks_hibernation_time", 600);
41 local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", true); 41 local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", true);
42 local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false); 42 local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false);
43 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0); 43 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0);
44 local max_inactive_unacked_stanzas = module:get_option_number("smacks_max_inactive_unacked_stanzas", 256); 44 local max_inactive_unacked_stanzas = module:get_option_number("smacks_max_inactive_unacked_stanzas", 256);
49 local function format_h(h) if h then return string.format("%d", h) end end 49 local function format_h(h) if h then return string.format("%d", h) end end
50 50
51 local old_session_registry = module:open_store("smacks_h", "map"); 51 local old_session_registry = module:open_store("smacks_h", "map");
52 local session_registry = module:shared "/*/smacks/resumption-tokens"; -- > user@host/resumption-token --> resource 52 local session_registry = module:shared "/*/smacks/resumption-tokens"; -- > user@host/resumption-token --> resource
53 53
54 local ack_errors = require"util.error".init("mod_smacks", xmlns_sm3, {
55 head = { condition = "undefined-condition"; text = "Client acknowledged more stanzas than sent by server" };
56 tail = { condition = "undefined-condition"; text = "Client acknowledged less stanzas than already acknowledged" };
57 pop = { condition = "internal-server-error"; text = "Something went wrong with Stream Management" };
58 overflow = { condition = "resource-constraint", text = "Too many unacked stanzas remaining, session can't be resumed" }
59 });
60
61 -- COMPAT note the use of compatibilty wrapper in events (queue:table())
62
54 local function ack_delayed(session, stanza) 63 local function ack_delayed(session, stanza)
55 -- fire event only if configured to do so and our session is not already hibernated or destroyed 64 -- fire event only if configured to do so and our session is not already hibernated or destroyed
56 if delayed_ack_timeout > 0 and session.awaiting_ack 65 if delayed_ack_timeout > 0 and session.awaiting_ack
57 and not session.hibernating and not session.destroyed then 66 and not session.hibernating and not session.destroyed then
58 session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d", 67 session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d",
59 session.outgoing_stanza_queue and #session.outgoing_stanza_queue or 0); 68 session.outgoing_stanza_queue and session.outgoing_stanza_queue:count_unacked() or 0);
60 module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue, stanza = stanza}); 69 module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue:table(), stanza = stanza});
61 end 70 end
62 session.delayed_ack_timer = nil; 71 session.delayed_ack_timer = nil;
63 end 72 end
64 73
65 local function can_do_smacks(session, advertise_only) 74 local function can_do_smacks(session, advertise_only)
99 if not session.smacks then return end -- not using 108 if not session.smacks then return end -- not using
100 if session.hibernating then return end -- can't ack when asleep 109 if session.hibernating then return end -- can't ack when asleep
101 if session.awaiting_ack then return end -- already waiting 110 if session.awaiting_ack then return end -- already waiting
102 if force then return force end 111 if force then return force end
103 local queue = session.outgoing_stanza_queue; 112 local queue = session.outgoing_stanza_queue;
104 local expected_h = session.last_acknowledged_stanza + #queue; 113 local expected_h = session.last_acknowledged_stanza + queue:count_unacked();
105 local max_unacked = max_unacked_stanzas; 114 local max_unacked = max_unacked_stanzas;
106 if session.state == "inactive" then 115 if session.state == "inactive" then
107 max_unacked = max_inactive_unacked_stanzas; 116 max_unacked = max_inactive_unacked_stanzas;
108 end 117 end
109 -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong 118 -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong
110 -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any 119 -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any
111 -- further requests until a higher h-value would be expected. 120 -- further requests until a higher h-value would be expected.
112 return #queue > max_unacked and expected_h ~= session.last_requested_h; 121 return queue:count_unacked() > max_unacked and expected_h ~= session.last_requested_h;
113 end 122 end
114 123
115 local function request_ack(session, reason) 124 local function request_ack(session, reason)
116 local queue = session.outgoing_stanza_queue; 125 local queue = session.outgoing_stanza_queue;
117 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue); 126 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, queue:count_unacked());
118 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) 127 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
119 if session.destroyed then return end -- sending something can trigger destruction 128 if session.destroyed then return end -- sending something can trigger destruction
120 session.awaiting_ack = true; 129 session.awaiting_ack = true;
121 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) 130 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
122 session.last_requested_h = session.last_acknowledged_stanza + #queue; 131 session.last_requested_h = session.last_acknowledged_stanza + queue:count_unacked();
123 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue); 132 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, queue:count_unacked());
124 if not session.delayed_ack_timer then 133 if not session.delayed_ack_timer then
125 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() 134 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
126 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue 135 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue
127 end); 136 end);
128 end 137 end
148 local queue = session.outgoing_stanza_queue; 157 local queue = session.outgoing_stanza_queue;
149 local max_unacked = max_unacked_stanzas; 158 local max_unacked = max_unacked_stanzas;
150 if session.state == "inactive" then 159 if session.state == "inactive" then
151 max_unacked = max_inactive_unacked_stanzas; 160 max_unacked = max_inactive_unacked_stanzas;
152 end 161 end
153 if #queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then 162 if queue:count_unacked() > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then
154 session.log("debug", "Calling ack_delayed directly (still waiting for ack)"); 163 session.log("debug", "Calling ack_delayed directly (still waiting for ack)");
155 ack_delayed(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules 164 ack_delayed(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules
156 end 165 end
157 end 166 end
158 167
176 from = jid.bare(session.full_jid or session.host), 185 from = jid.bare(session.full_jid or session.host),
177 stamp = datetime.datetime() 186 stamp = datetime.datetime()
178 }); 187 });
179 end 188 end
180 189
181 queue[#queue+1] = cached_stanza; 190 queue:push(cached_stanza);
191
182 if session.hibernating then 192 if session.hibernating then
183 session.log("debug", "hibernating since %s, stanza queued", datetime.datetime(session.hibernating)); 193 session.log("debug", "hibernating since %s, stanza queued", datetime.datetime(session.hibernating));
184 module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue, stanza = cached_stanza}); 194 -- FIXME queue implementation changed, anything depending on it being an array will break
195 module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue:table(), stanza = cached_stanza});
185 return nil; 196 return nil;
186 end 197 end
187 end 198 end
188 return stanza; 199 return stanza;
189 end 200 end
196 return stanza; 207 return stanza;
197 end 208 end
198 209
199 local function wrap_session_out(session, resume) 210 local function wrap_session_out(session, resume)
200 if not resume then 211 if not resume then
201 session.outgoing_stanza_queue = {}; 212 session.outgoing_stanza_queue = smqueue.new(queue_size);
202 session.last_acknowledged_stanza = 0; 213 session.last_acknowledged_stanza = 0;
203 end 214 end
204 215
205 add_filter(session, "stanzas/out", outgoing_stanza_filter, -999); 216 add_filter(session, "stanzas/out", outgoing_stanza_filter, -999);
206 217
322 if origin.delayed_ack_timer then 333 if origin.delayed_ack_timer then
323 timer.stop(origin.delayed_ack_timer) 334 timer.stop(origin.delayed_ack_timer)
324 origin.delayed_ack_timer = nil; 335 origin.delayed_ack_timer = nil;
325 end 336 end
326 -- Remove handled stanzas from outgoing_stanza_queue 337 -- Remove handled stanzas from outgoing_stanza_queue
327 -- origin.log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or "");
328 local h = tonumber(stanza.attr.h); 338 local h = tonumber(stanza.attr.h);
329 if not h then 339 if not h then
330 origin:close{ condition = "invalid-xml"; text = "Missing or invalid 'h' attribute"; }; 340 origin:close{ condition = "invalid-xml"; text = "Missing or invalid 'h' attribute"; };
331 return; 341 return;
332 end 342 end
333 local handled_stanza_count = h-origin.last_acknowledged_stanza;
334 local queue = origin.outgoing_stanza_queue; 343 local queue = origin.outgoing_stanza_queue;
335 if handled_stanza_count > #queue then 344 local handled_stanza_count = h-queue:count_acked();
336 origin.log("warn", "The client says it handled %d new stanzas, but we only sent %d :)", 345 local acked, err = ack_errors.coerce(queue:ack(h)); -- luacheck: ignore 211/acked
337 handled_stanza_count, #queue); 346 if err then
338 origin.log("debug", "Client h: %d, our h: %d", tonumber(stanza.attr.h), origin.last_acknowledged_stanza); 347 origin.log("warn", "The client says it handled %d new stanzas, but we sent %d :)",
339 for i=1,#queue do 348 handled_stanza_count, queue:count_unacked());
340 origin.log("debug", "Q item %d: %s", i, tostring(queue[i])); 349 origin.log("debug", "Client h: %d, our h: %d", tonumber(stanza.attr.h), queue:count_acked());
341 end 350 for i, item in queue._queue:items() do
342 origin:close{ condition = "undefined-condition"; text = "Client acknowledged more stanzas than sent by server"; }; 351 origin.log("debug", "Q item %d: %s", i, item);
352 end
353 origin:close(err);
343 return; 354 return;
344 end 355 end
345 356
346 for _=1,math_min(handled_stanza_count,#queue) do 357 origin.log("debug", "#queue = %d", queue:count_unacked());
347 t_remove(origin.outgoing_stanza_queue, 1);
348 end
349
350 origin.log("debug", "#queue = %d", #queue);
351 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count;
352 request_ack_now_if_needed(origin, false, "handle_a", nil) 358 request_ack_now_if_needed(origin, false, "handle_a", nil)
353 return true; 359 return true;
354 end 360 end
355 module:hook_tag(xmlns_sm2, "a", handle_a); 361 module:hook_tag(xmlns_sm2, "a", handle_a);
356 module:hook_tag(xmlns_sm3, "a", handle_a); 362 module:hook_tag(xmlns_sm3, "a", handle_a);
357 363
358 local function handle_unacked_stanzas(session) 364 local function handle_unacked_stanzas(session)
359 local queue = session.outgoing_stanza_queue; 365 local queue = session.outgoing_stanza_queue;
360 if #queue > 0 then 366 if queue:count_unacked() > 0 then
361 session.outgoing_stanza_queue = {}; 367 session.smacks = false; -- Disable queueing
362 for i=1,#queue do 368 session.outgoing_stanza_queue = nil;
363 if not module:fire_event("delivery/failure", { session = session, stanza = queue[i] }) then 369 for stanza in queue._queue:consume() do
364 if queue[i].attr.type ~= "error" and queue[i].attr.from ~= session.full_jid then 370 if not module:fire_event("delivery/failure", { session = session, stanza = stanza }) then
365 local reply = st.error_reply(queue[i], "cancel", "recipient-unavailable"); 371 if stanza.attr.type ~= "error" and stanza.attr.to ~= session.full_jid then
372 local reply = st.error_reply(stanza, "cancel", "recipient-unavailable");
366 core_process_stanza(session, reply); 373 core_process_stanza(session, reply);
367 end 374 end
368 end 375 end
369 end 376 end
370 end 377 end
414 module:hook("pre-resource-unbind", function (event) 421 module:hook("pre-resource-unbind", function (event)
415 local session = event.session; 422 local session = event.session;
416 if not session.smacks then return end 423 if not session.smacks then return end
417 if not session.resumption_token then 424 if not session.resumption_token then
418 local queue = session.outgoing_stanza_queue; 425 local queue = session.outgoing_stanza_queue;
419 if #queue > 0 then 426 if queue:count_unacked() > 0 then
420 session.log("debug", "Destroying session with %d unacked stanzas", #queue); 427 session.log("debug", "Destroying session with %d unacked stanzas", queue:count_unacked());
421 handle_unacked_stanzas(session); 428 handle_unacked_stanzas(session);
422 end 429 end
423 return 430 return
424 end 431 end
425 432
438 sessionmanager.destroy_session(session, "Hibernating too long"); 445 sessionmanager.destroy_session(session, "Hibernating too long");
439 end); 446 end);
440 if session.conn then 447 if session.conn then
441 session.conn:close(); 448 session.conn:close();
442 end 449 end
443 module:fire_event("smacks-hibernation-start", { origin = session; queue = session.outgoing_stanza_queue }); 450 module:fire_event("smacks-hibernation-start", { origin = session; queue = session.outgoing_stanza_queue:table() });
444 return true; -- Postpone destruction for now 451 return true; -- Postpone destruction for now
445 end); 452 end);
446 453
447 local function handle_s2s_destroyed(event) 454 local function handle_s2s_destroyed(event)
448 local session = event.session; 455 local session = event.session;
449 local queue = session.outgoing_stanza_queue; 456 local queue = session.outgoing_stanza_queue;
450 if queue and #queue > 0 then 457 if queue and queue:count_unacked() > 0 then
451 session.log("warn", "Destroying session with %d unacked stanzas", #queue); 458 session.log("warn", "Destroying session with %d unacked stanzas", queue:count_unacked());
452 if s2s_resend then 459 if s2s_resend then
453 for i = 1, #queue do 460 for stanza in queue:consume() do
454 module:send(queue[i]); 461 module:send(stanza);
455 end 462 end
456 session.outgoing_stanza_queue = nil; 463 session.outgoing_stanza_queue = nil;
457 else 464 else
458 handle_unacked_stanzas(session); 465 handle_unacked_stanzas(session);
459 end 466 end
503 original_session.log("debug", "mod_smacks closing an old connection for this session"); 510 original_session.log("debug", "mod_smacks closing an old connection for this session");
504 local conn = original_session.conn; 511 local conn = original_session.conn;
505 c2s_sessions[conn] = nil; 512 c2s_sessions[conn] = nil;
506 conn:close(); 513 conn:close();
507 end 514 end
515
508 local migrated_session_log = session.log; 516 local migrated_session_log = session.log;
509 original_session.ip = session.ip; 517 original_session.ip = session.ip;
510 original_session.conn = session.conn; 518 original_session.conn = session.conn;
511 original_session.rawsend = session.rawsend; 519 original_session.rawsend = session.rawsend;
512 original_session.rawsend.session = original_session; 520 original_session.rawsend.session = original_session;
528 -- Inform xmppstream of the new session (passed to its callbacks) 536 -- Inform xmppstream of the new session (passed to its callbacks)
529 original_session.stream:set_session(original_session); 537 original_session.stream:set_session(original_session);
530 -- Similar for connlisteners 538 -- Similar for connlisteners
531 c2s_sessions[session.conn] = original_session; 539 c2s_sessions[session.conn] = original_session;
532 540
541 local queue = original_session.outgoing_stanza_queue;
542 local h = tonumber(stanza.attr.h);
543
544 original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked())
545 local acked, err = ack_errors.coerce(queue:ack(h)); -- luacheck: ignore 211/acked
546
547 if not err and not queue:resumable() then
548 err = ack_errors.new("overflow");
549 end
550
551 if err or not queue:resumable() then
552 original_session.send(st.stanza("failed",
553 { xmlns = xmlns_sm; h = format_h(original_session.handled_stanza_count); previd = id }));
554 original_session:close(err);
555 return false;
556 end
557
533 original_session.send(st.stanza("resumed", { xmlns = xmlns_sm, 558 original_session.send(st.stanza("resumed", { xmlns = xmlns_sm,
534 h = format_h(original_session.handled_stanza_count), previd = id })); 559 h = format_h(original_session.handled_stanza_count), previd = id }));
535
536 -- Fake an <a> with the h of the <resume/> from the client
537 original_session:dispatch_stanza(st.stanza("a", { xmlns = xmlns_sm,
538 h = stanza.attr.h }));
539 560
540 -- Ok, we need to re-send any stanzas that the client didn't see 561 -- Ok, we need to re-send any stanzas that the client didn't see
541 -- ...they are what is now left in the outgoing stanza queue 562 -- ...they are what is now left in the outgoing stanza queue
542 -- We have to use the send of "session" because we don't want to add our resent stanzas 563 -- We have to use the send of "session" because we don't want to add our resent stanzas
543 -- to the outgoing queue again 564 -- to the outgoing queue again
544 local queue = original_session.outgoing_stanza_queue; 565
545 session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", #queue); 566 session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked());
546 -- FIXME Which session is it that the queue filter sees? 567 -- FIXME Which session is it that the queue filter sees?
547 session.resending_unacked = true; 568 session.resending_unacked = true;
548 original_session.resending_unacked = true; 569 original_session.resending_unacked = true;
549 for i=1,#queue do 570 for _, queued_stanza in queue:resume() do
550 session.send(queue[i]); 571 session.send(queued_stanza);
551 end 572 end
552 session.resending_unacked = nil; 573 session.resending_unacked = nil;
553 original_session.resending_unacked = nil; 574 original_session.resending_unacked = nil;
554 session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", #queue); 575 session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", queue:count_unacked());
555 function session.send(stanza) -- luacheck: ignore 432 576 function session.send(stanza) -- luacheck: ignore 432
556 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); 577 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza));
557 return false; 578 return false;
558 end 579 end
559 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue}); 580 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()});
560 request_ack_if_needed(original_session, true, "handle_resume", nil); 581 request_ack_if_needed(original_session, true, "handle_resume", nil);
561 end 582 end
562 return true; 583 return true;
563 end 584 end
564 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); 585 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);