Comparison

plugins/mod_smacks.lua @ 12802:4a8740e01813

Merge 0.12->trunk
author Kim Alvefur <zash@zash.se>
date Mon, 12 Dec 2022 07:10:54 +0100
parent 12800:06ba2f8cee47
child 12803:2e12290820e8
comparison
equal deleted inserted replaced
12801:ebd6b4d8bf04 12802:4a8740e01813
1 -- XEP-0198: Stream Management for Prosody IM 1 -- XEP-0198: Stream Management for Prosody IM
2 -- 2 --
3 -- Copyright (C) 2010-2015 Matthew Wild 3 -- Copyright (C) 2010-2015 Matthew Wild
4 -- Copyright (C) 2010 Waqas Hussain 4 -- Copyright (C) 2010 Waqas Hussain
5 -- Copyright (C) 2012-2021 Kim Alvefur 5 -- Copyright (C) 2012-2022 Kim Alvefur
6 -- Copyright (C) 2012 Thijs Alkemade 6 -- Copyright (C) 2012 Thijs Alkemade
7 -- Copyright (C) 2014 Florian Zeitz 7 -- Copyright (C) 2014 Florian Zeitz
8 -- Copyright (C) 2016-2020 Thilo Molitor 8 -- Copyright (C) 2016-2020 Thilo Molitor
9 -- 9 --
10 -- This project is MIT/X11 licensed. Please see the 10 -- This project is MIT/X11 licensed. Please see the
11 -- COPYING file in the source package for more information. 11 -- COPYING file in the source package for more information.
12 -- 12 --
13 -- TODO unify sendq and smqueue
13 14
14 local tonumber = tonumber; 15 local tonumber = tonumber;
15 local tostring = tostring; 16 local tostring = tostring;
16 local os_time = os.time; 17 local os_time = os.time;
17 18
81 82
82 local all_old_sessions = module:open_store("smacks_h"); 83 local all_old_sessions = module:open_store("smacks_h");
83 local old_session_registry = module:open_store("smacks_h", "map"); 84 local old_session_registry = module:open_store("smacks_h", "map");
84 local session_registry = module:shared "/*/smacks/resumption-tokens"; -- > user@host/resumption-token --> resource 85 local session_registry = module:shared "/*/smacks/resumption-tokens"; -- > user@host/resumption-token --> resource
85 86
87 local function track_session(session, id)
88 session_registry[jid.join(session.username, session.host, id or session.resumption_token)] = session;
89 session.resumption_token = id;
90 end
91
92 local function save_old_session(session)
93 session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil;
94 return old_session_registry:set(session.username, session.resumption_token,
95 { h = session.handled_stanza_count; t = os.time() })
96 end
97
98 local function clear_old_session(session, id)
99 session_registry[jid.join(session.username, session.host, id or session.resumption_token)] = nil;
100 return old_session_registry:set(session.username, id or session.resumption_token, nil)
101 end
102
86 local ack_errors = require"util.error".init("mod_smacks", xmlns_sm3, { 103 local ack_errors = require"util.error".init("mod_smacks", xmlns_sm3, {
87 head = { condition = "undefined-condition"; text = "Client acknowledged more stanzas than sent by server" }; 104 head = { condition = "undefined-condition"; text = "Client acknowledged more stanzas than sent by server" };
88 tail = { condition = "undefined-condition"; text = "Client acknowledged less stanzas than already acknowledged" }; 105 tail = { condition = "undefined-condition"; text = "Client acknowledged less stanzas than already acknowledged" };
89 pop = { condition = "internal-server-error"; text = "Something went wrong with Stream Management" }; 106 pop = { condition = "internal-server-error"; text = "Something went wrong with Stream Management" };
90 overflow = { condition = "resource-constraint", text = "Too many unacked stanzas remaining, session can't be resumed" } 107 overflow = { condition = "resource-constraint", text = "Too many unacked stanzas remaining, session can't be resumed" }
108 });
109
110 local enable_errors = require "util.error".init("mod_smacks", xmlns_sm3, {
111 already_enabled = { condition = "unexpected-request", text = "Stream management is already enabled" };
112 bind_required = { condition = "unexpected-request", text = "Client must bind a resource before enabling stream management" };
113 unavailable = { condition = "service-unavailable", text = "Stream management is not available for this stream" };
114 -- Resumption
115 expired = { condition = "item-not-found", text = "Session expired, and cannot be resumed" };
116 already_bound = { condition = "unexpected-request", text = "Cannot resume another session after a resource is bound" };
117 unknown_session = { condition = "item-not-found", text = "Unknown session" };
91 }); 118 });
92 119
93 -- COMPAT note the use of compatibility wrapper in events (queue:table()) 120 -- COMPAT note the use of compatibility wrapper in events (queue:table())
94 121
95 local function ack_delayed(session, stanza) 122 local function ack_delayed(session, stanza)
102 end 129 end
103 session.delayed_ack_timer = nil; 130 session.delayed_ack_timer = nil;
104 end 131 end
105 132
106 local function can_do_smacks(session, advertise_only) 133 local function can_do_smacks(session, advertise_only)
107 if session.smacks then return false, "unexpected-request", "Stream management is already enabled"; end 134 if session.smacks then return false, enable_errors.new("already_enabled"); end
108 135
109 local session_type = session.type; 136 local session_type = session.type;
110 if session.username then 137 if session.username then
111 if not(advertise_only) and not(session.resource) then -- Fail unless we're only advertising sm 138 if not(advertise_only) and not(session.resource) then -- Fail unless we're only advertising sm
112 return false, "unexpected-request", "Client must bind a resource before enabling stream management"; 139 return false, enable_errors.new("bind_required");
113 end 140 end
114 return true; 141 return true;
115 elseif s2s_smacks and (session_type == "s2sin" or session_type == "s2sout") then 142 elseif s2s_smacks and (session_type == "s2sin" or session_type == "s2sout") then
116 return true; 143 return true;
117 end 144 end
118 return false, "service-unavailable", "Stream management is not available for this stream"; 145 return false, enable_errors.new("unavailable");
119 end 146 end
120 147
121 module:hook("stream-features", 148 module:hook("stream-features",
122 function (event) 149 function (event)
123 if can_do_smacks(event.origin, true) then 150 if can_do_smacks(event.origin, true) then
153 return queue:count_unacked() > max_unacked and expected_h ~= session.last_requested_h; 180 return queue:count_unacked() > max_unacked and expected_h ~= session.last_requested_h;
154 end 181 end
155 182
156 local function request_ack(session, reason) 183 local function request_ack(session, reason)
157 local queue = session.outgoing_stanza_queue; 184 local queue = session.outgoing_stanza_queue;
158 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, queue:count_unacked()); 185 session.log("debug", "Sending <r> from %s - #queue=%d", reason, queue:count_unacked());
159 session.awaiting_ack = true; 186 session.awaiting_ack = true;
160 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) 187 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
161 if session.destroyed then return end -- sending something can trigger destruction 188 if session.destroyed then return end -- sending something can trigger destruction
162 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) 189 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
163 session.last_requested_h = queue:count_acked() + queue:count_unacked(); 190 session.last_requested_h = queue:count_acked() + queue:count_unacked();
164 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, queue:count_unacked());
165 if not session.delayed_ack_timer then 191 if not session.delayed_ack_timer then
166 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() 192 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
167 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue 193 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue
168 end); 194 end);
169 end 195 end
178 local function outgoing_stanza_filter(stanza, session) 204 local function outgoing_stanza_filter(stanza, session)
179 -- XXX: Normally you wouldn't have to check the xmlns for a stanza as it's 205 -- XXX: Normally you wouldn't have to check the xmlns for a stanza as it's
180 -- supposed to be nil. 206 -- supposed to be nil.
181 -- However, when using mod_smacks with mod_websocket, then mod_websocket's 207 -- However, when using mod_smacks with mod_websocket, then mod_websocket's
182 -- stanzas/out filter can get called before this one and adds the xmlns. 208 -- stanzas/out filter can get called before this one and adds the xmlns.
183 if session.resending_unacked then return stanza end
184 if not session.smacks then return stanza end 209 if not session.smacks then return stanza end
185 local is_stanza = st.is_stanza(stanza) and 210 local is_stanza = st.is_stanza(stanza) and
186 (not stanza.attr.xmlns or stanza.attr.xmlns == 'jabber:client') 211 (not stanza.attr.xmlns or stanza.attr.xmlns == 'jabber:client')
187 and not stanza.name:find":"; 212 and not stanza.name:find":";
188 213
232 module:hook("pre-session-close", function(event) 257 module:hook("pre-session-close", function(event)
233 local session = event.session; 258 local session = event.session;
234 if session.smacks == nil then return end 259 if session.smacks == nil then return end
235 if session.resumption_token then 260 if session.resumption_token then
236 session.log("debug", "Revoking resumption token"); 261 session.log("debug", "Revoking resumption token");
237 session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil; 262 clear_old_session(session);
238 old_session_registry:set(session.username, session.resumption_token, nil);
239 session.resumption_token = nil; 263 session.resumption_token = nil;
240 else 264 else
241 session.log("debug", "Session not resumable"); 265 session.log("debug", "Session not resumable");
242 end 266 end
243 if session.hibernating_watchdog then 267 if session.hibernating_watchdog then
272 wrap_session_out(session, resume); 296 wrap_session_out(session, resume);
273 wrap_session_in(session, resume); 297 wrap_session_in(session, resume);
274 return session; 298 return session;
275 end 299 end
276 300
277 function handle_enable(session, stanza, xmlns_sm) 301 function do_enable(session, stanza)
278 local ok, err, err_text = can_do_smacks(session); 302 local ok, err = can_do_smacks(session);
279 if not ok then 303 if not ok then
280 session.log("warn", "Failed to enable smacks: %s", err_text); -- TODO: XEP doesn't say we can send error text, should it? 304 session.log("warn", "Failed to enable smacks: %s", err.text); -- TODO: XEP doesn't say we can send error text, should it?
281 (session.sends2s or session.send)(st.stanza("failed", { xmlns = xmlns_sm }):tag(err, { xmlns = xmlns_errors})); 305 return nil, err;
282 return true;
283 end 306 end
284 307
285 if session.username then 308 if session.username then
286 local old_sessions, err = all_old_sessions:get(session.username); 309 local old_sessions, err = all_old_sessions:get(session.username);
287 module:log("debug", "Old sessions: %q", old_sessions) 310 session.log("debug", "Old sessions: %q", old_sessions)
288 if old_sessions then 311 if old_sessions then
289 local keep, count = {}, 0; 312 local keep, count = {}, 0;
290 for token, info in it.sorted_pairs(old_sessions, function(a, b) 313 for token, info in it.sorted_pairs(old_sessions, function(a, b)
291 return (old_sessions[a].t or 0) > (old_sessions[b].t or 0); 314 return (old_sessions[a].t or 0) > (old_sessions[b].t or 0);
292 end) do 315 end) do
294 if count > max_old_sessions then break end 317 if count > max_old_sessions then break end
295 keep[token] = info; 318 keep[token] = info;
296 end 319 end
297 all_old_sessions:set(session.username, keep); 320 all_old_sessions:set(session.username, keep);
298 elseif err then 321 elseif err then
299 module:log("error", "Unable to retrieve old resumption counters: %s", err); 322 session.log("error", "Unable to retrieve old resumption counters: %s", err);
300 end 323 end
301 end 324 end
302 325
303 module:log("debug", "Enabling stream management");
304 session.smacks = xmlns_sm;
305
306 wrap_session(session, false);
307
308 local resume_max;
309 local resume_token; 326 local resume_token;
310 local resume = stanza.attr.resume; 327 local resume = stanza.attr.resume;
311 if (resume == "true" or resume == "1") and session.username then 328 if (resume == "true" or resume == "1") and session.username then
312 -- resumption on s2s is not currently supported 329 -- resumption on s2s is not currently supported
313 resume_token = new_id(); 330 resume_token = new_id();
314 session_registry[jid.join(session.username, session.host, resume_token)] = session; 331 end
315 session.resumption_token = resume_token; 332
316 resume_max = tostring(resume_timeout); 333 return {
317 end 334 type = "enabled";
318 (session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume, max = resume_max })); 335 id = resume_token;
336 resume_max = resume_token and tostring(resume_timeout) or nil;
337 session = session;
338 finish = function ()
339 session.log("debug", "Enabling stream management");
340
341 session.smacks = stanza.attr.xmlns;
342 if resume_token then
343 track_session(session, resume_token);
344 end
345 wrap_session(session, false);
346 end;
347 };
348 end
349
350 function handle_enable(session, stanza, xmlns_sm)
351 local enabled, err = do_enable(session, stanza);
352 if not enabled then
353 (session.sends2s or session.send)(st.stanza("failed", { xmlns = xmlns_sm }):add_error(err));
354 return true;
355 end
356
357 (session.sends2s or session.send)(st.stanza("enabled", {
358 xmlns = xmlns_sm;
359 id = enabled.id;
360 resume = enabled.id and "true" or nil; -- COMPAT w/ Conversations 2.10.10 requires 'true' not '1'
361 max = enabled.resume_max;
362 }));
363
364 session.smacks = xmlns_sm;
365 enabled.finish();
366
319 return true; 367 return true;
320 end 368 end
321 module:hook_tag(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100); 369 module:hook_tag(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100);
322 module:hook_tag(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100); 370 module:hook_tag(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100);
323 371
324 module:hook_tag("http://etherx.jabber.org/streams", "features", 372 module:hook_tag("http://etherx.jabber.org/streams", "features", function(session, stanza)
325 function (session, stanza) 373 if can_do_smacks(session) then
326 -- Needs to be done after flushing sendq since those aren't stored as 374 session.smacks_feature = stanza:get_child("sm", xmlns_sm3) or stanza:get_child("sm", xmlns_sm2);
327 -- stanzas and counting them is weird. 375 end
328 -- TODO unify sendq and smqueue 376 end);
329 timer.add_task(1e-6, function () 377
330 if can_do_smacks(session) then 378 module:hook("s2sout-established", function (event)
331 if stanza:get_child("sm", xmlns_sm3) then 379 local session = event.session;
332 session.sends2s(st.stanza("enable", sm3_attr)); 380 if not session.smacks_feature then return end
333 session.smacks = xmlns_sm3; 381
334 elseif stanza:get_child("sm", xmlns_sm2) then 382 session.smacks = session.smacks_feature.attr.xmlns;
335 session.sends2s(st.stanza("enable", sm2_attr)); 383 wrap_session_out(session, false);
336 session.smacks = xmlns_sm2; 384 session.sends2s(st.stanza("enable", { xmlns = session.smacks }));
337 else 385 end);
338 return;
339 end
340 wrap_session_out(session, false);
341 end
342 end);
343 end);
344 386
345 function handle_enabled(session, stanza, xmlns_sm) -- luacheck: ignore 212/stanza 387 function handle_enabled(session, stanza, xmlns_sm) -- luacheck: ignore 212/stanza
346 module:log("debug", "Enabling stream management"); 388 session.log("debug", "Enabling stream management");
347 session.smacks = xmlns_sm; 389 session.smacks = xmlns_sm;
348 390
349 wrap_session_in(session, false); 391 wrap_session_in(session, false);
350 392
351 -- FIXME Resume? 393 -- FIXME Resume?
355 module:hook_tag(xmlns_sm2, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm2); end, 100); 397 module:hook_tag(xmlns_sm2, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm2); end, 100);
356 module:hook_tag(xmlns_sm3, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm3); end, 100); 398 module:hook_tag(xmlns_sm3, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm3); end, 100);
357 399
358 function handle_r(origin, stanza, xmlns_sm) -- luacheck: ignore 212/stanza 400 function handle_r(origin, stanza, xmlns_sm) -- luacheck: ignore 212/stanza
359 if not origin.smacks then 401 if not origin.smacks then
360 module:log("debug", "Received ack request from non-smack-enabled session"); 402 origin.log("debug", "Received ack request from non-smack-enabled session");
361 return; 403 return;
362 end 404 end
363 module:log("debug", "Received ack request, acking for %d", origin.handled_stanza_count); 405 origin.log("debug", "Received ack request, acking for %d", origin.handled_stanza_count);
364 -- Reply with <a> 406 -- Reply with <a>
365 (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = format_h(origin.handled_stanza_count) })); 407 (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = format_h(origin.handled_stanza_count) }));
366 -- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h) 408 -- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h)
367 request_ack_now_if_needed(origin, false, "piggybacked by handle_r", nil); 409 request_ack_now_if_needed(origin, false, "piggybacked by handle_r", nil);
368 return true; 410 return true;
411 453
412 local function handle_unacked_stanzas(session) 454 local function handle_unacked_stanzas(session)
413 local queue = session.outgoing_stanza_queue; 455 local queue = session.outgoing_stanza_queue;
414 local unacked = queue:count_unacked() 456 local unacked = queue:count_unacked()
415 if unacked > 0 then 457 if unacked > 0 then
458 local error_from = jid.join(session.username, session.host or module.host);
416 tx_dropped_stanzas:sample(unacked); 459 tx_dropped_stanzas:sample(unacked);
417 session.smacks = false; -- Disable queueing 460 session.smacks = false; -- Disable queueing
418 session.outgoing_stanza_queue = nil; 461 session.outgoing_stanza_queue = nil;
419 for stanza in queue._queue:consume() do 462 for stanza in queue._queue:consume() do
420 if not module:fire_event("delivery/failure", { session = session, stanza = stanza }) then 463 if not module:fire_event("delivery/failure", { session = session, stanza = stanza }) then
421 if stanza.attr.type ~= "error" and stanza.attr.from ~= session.full_jid then 464 if stanza.attr.type ~= "error" and stanza.attr.from ~= session.full_jid then
422 local reply = st.error_reply(stanza, "cancel", "recipient-unavailable"); 465 local reply = st.error_reply(stanza, "cancel", "recipient-unavailable", nil, error_from);
423 module:send(reply); 466 module:send(reply);
424 end 467 end
425 end 468 end
426 end 469 end
427 end 470 end
484 session.log("debug", "The session has already been resumed or replaced"); 527 session.log("debug", "The session has already been resumed or replaced");
485 return 528 return
486 end 529 end
487 530
488 session.log("debug", "Destroying session for hibernating too long"); 531 session.log("debug", "Destroying session for hibernating too long");
489 session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil; 532 save_old_session(session);
490 old_session_registry:set(session.username, session.resumption_token,
491 { h = session.handled_stanza_count; t = os.time() });
492 session.resumption_token = nil; 533 session.resumption_token = nil;
493 session.resending_unacked = true; -- stop outgoing_stanza_filter from re-queueing anything anymore
494 sessionmanager.destroy_session(session, "Hibernating too long"); 534 sessionmanager.destroy_session(session, "Hibernating too long");
495 sessions_expired(1); 535 sessions_expired(1);
496 end); 536 end);
497 if session.conn then 537 if session.conn then
498 local conn = session.conn; 538 local conn = session.conn;
521 end 561 end
522 562
523 module:hook("s2sout-destroyed", handle_s2s_destroyed); 563 module:hook("s2sout-destroyed", handle_s2s_destroyed);
524 module:hook("s2sin-destroyed", handle_s2s_destroyed); 564 module:hook("s2sin-destroyed", handle_s2s_destroyed);
525 565
526 local function get_session_id(session) 566 function do_resume(session, stanza)
527 return session.id or (tostring(session):match("[a-f0-9]+$"));
528 end
529
530 function handle_resume(session, stanza, xmlns_sm)
531 if session.full_jid then 567 if session.full_jid then
532 session.log("warn", "Tried to resume after resource binding"); 568 session.log("warn", "Tried to resume after resource binding");
533 session.send(st.stanza("failed", { xmlns = xmlns_sm }) 569 return nil, enable_errors.new("already_bound");
534 :tag("unexpected-request", { xmlns = xmlns_errors })
535 );
536 return true;
537 end 570 end
538 571
539 local id = stanza.attr.previd; 572 local id = stanza.attr.previd;
540 local original_session = session_registry[jid.join(session.username, session.host, id)]; 573 local original_session = session_registry[jid.join(session.username, session.host, id)];
541 if not original_session then 574 if not original_session then
542 local old_session = old_session_registry:get(session.username, id); 575 local old_session = old_session_registry:get(session.username, id);
543 if old_session then 576 if old_session then
544 session.log("debug", "Tried to resume old expired session with id %s", id); 577 session.log("debug", "Tried to resume old expired session with id %s", id);
545 session.send(st.stanza("failed", { xmlns = xmlns_sm, h = format_h(old_session.h) }) 578 clear_old_session(session, id);
546 :tag("item-not-found", { xmlns = xmlns_errors })
547 );
548 old_session_registry:set(session.username, id, nil);
549 resumption_expired(1); 579 resumption_expired(1);
550 else 580 return nil, enable_errors.new("expired", { h = old_session.h });
551 session.log("debug", "Tried to resume non-existent session with id %s", id); 581 end
552 session.send(st.stanza("failed", { xmlns = xmlns_sm }) 582 session.log("debug", "Tried to resume non-existent session with id %s", id);
553 :tag("item-not-found", { xmlns = xmlns_errors }) 583 return nil, enable_errors.new("unknown_session");
554 ); 584 end
585
586 if original_session.hibernating_watchdog then
587 original_session.log("debug", "Letting the watchdog go");
588 original_session.hibernating_watchdog:cancel();
589 original_session.hibernating_watchdog = nil;
590 elseif session.hibernating then
591 original_session.log("error", "Hibernating session has no watchdog!")
592 end
593 -- zero age = was not hibernating yet
594 local age = 0;
595 if original_session.hibernating then
596 local now = os_time();
597 age = now - original_session.hibernating;
598 end
599
600 session.log("debug", "mod_smacks resuming existing session %s...", original_session.id);
601
602 local queue = original_session.outgoing_stanza_queue;
603 local h = tonumber(stanza.attr.h);
604
605 original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked())
606 local acked, err = ack_errors.coerce(queue:ack(h)); -- luacheck: ignore 211/acked
607
608 if not err and not queue:resumable() then
609 err = ack_errors.new("overflow");
610 end
611
612 if err then
613 session.log("debug", "Resumption failed: %s", err);
614 return nil, err;
615 end
616
617 -- Update original_session with the parameters (connection, etc.) from the new session
618 sessionmanager.update_session(original_session, session);
619
620 return {
621 type = "resumed";
622 session = original_session;
623 id = id;
624 -- Return function to complete the resumption and resync unacked stanzas
625 -- This is two steps so we can support SASL2/ISR
626 finish = function ()
627 -- Ok, we need to re-send any stanzas that the client didn't see
628 -- ...they are what is now left in the outgoing stanza queue
629 -- We have to use the send of "session" because we don't want to add our resent stanzas
630 -- to the outgoing queue again
631
632 original_session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked());
633 for _, queued_stanza in queue:resume() do
634 original_session.send(queued_stanza);
635 end
636 original_session.log("debug", "all stanzas resent, enabling stream management on resumed stream, #queue = %d", queue:count_unacked());
637
638 -- Add our own handlers to the resumed session (filters have been reset in the update)
639 wrap_session(original_session, true);
640
641 -- Let everyone know that we are no longer hibernating
642 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()});
643 original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption
644 request_ack_now_if_needed(original_session, true, "handle_resume", nil);
645 resumption_age:sample(age);
555 end; 646 end;
556 else 647 };
557 if original_session.hibernating_watchdog then 648 end
558 original_session.log("debug", "Letting the watchdog go"); 649
559 original_session.hibernating_watchdog:cancel(); 650 function handle_resume(session, stanza, xmlns_sm)
560 original_session.hibernating_watchdog = nil; 651 local resumed, err = do_resume(session, stanza);
561 elseif session.hibernating then 652 if not resumed then
562 original_session.log("error", "Hibernating session has no watchdog!") 653 session.send(st.stanza("failed", { xmlns = xmlns_sm, h = format_h(err.context.h) })
563 end 654 :tag(err.condition, { xmlns = xmlns_errors }));
564 -- zero age = was not hibernating yet 655 return true;
565 local age = 0; 656 end
566 if original_session.hibernating then 657
567 local now = os_time(); 658 session = resumed.session;
568 age = now - original_session.hibernating; 659
569 end 660 -- Inform client of successful resumption
570 session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session)); 661 session.send(st.stanza("resumed", { xmlns = xmlns_sm,
571 original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session)); 662 h = format_h(session.handled_stanza_count), previd = resumed.id }));
572 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) 663
573 if original_session.conn then 664 -- Complete resume (sync stanzas, etc.)
574 original_session.log("debug", "mod_smacks closing an old connection for this session"); 665 resumed.finish();
575 local conn = original_session.conn; 666
576 c2s_sessions[conn] = nil;
577 conn:close();
578 end
579
580 local migrated_session_log = session.log;
581 original_session.ip = session.ip;
582 original_session.conn = session.conn;
583 original_session.rawsend = session.rawsend;
584 original_session.rawsend.session = original_session;
585 original_session.rawsend.conn = original_session.conn;
586 original_session.send = session.send;
587 original_session.send.session = original_session;
588 original_session.close = session.close;
589 original_session.filter = session.filter;
590 original_session.filter.session = original_session;
591 original_session.filters = session.filters;
592 original_session.send.filter = original_session.filter;
593 original_session.stream = session.stream;
594 original_session.secure = session.secure;
595 original_session.hibernating = nil;
596 original_session.resumption_counter = (original_session.resumption_counter or 0) + 1;
597 session.log = original_session.log;
598 session.type = original_session.type;
599 wrap_session(original_session, true);
600 -- Inform xmppstream of the new session (passed to its callbacks)
601 original_session.stream:set_session(original_session);
602 -- Similar for connlisteners
603 c2s_sessions[session.conn] = original_session;
604
605 local queue = original_session.outgoing_stanza_queue;
606 local h = tonumber(stanza.attr.h);
607
608 original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked())
609 local acked, err = ack_errors.coerce(queue:ack(h)); -- luacheck: ignore 211/acked
610
611 if not err and not queue:resumable() then
612 err = ack_errors.new("overflow");
613 end
614
615 if err or not queue:resumable() then
616 original_session.send(st.stanza("failed",
617 { xmlns = xmlns_sm; h = format_h(original_session.handled_stanza_count); previd = id }));
618 original_session:close(err);
619 return false;
620 end
621
622 original_session.send(st.stanza("resumed", { xmlns = xmlns_sm,
623 h = format_h(original_session.handled_stanza_count), previd = id }));
624
625 -- Ok, we need to re-send any stanzas that the client didn't see
626 -- ...they are what is now left in the outgoing stanza queue
627 -- We have to use the send of "session" because we don't want to add our resent stanzas
628 -- to the outgoing queue again
629
630 session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked());
631 -- FIXME Which session is it that the queue filter sees?
632 session.resending_unacked = true;
633 original_session.resending_unacked = true;
634 for _, queued_stanza in queue:resume() do
635 session.send(queued_stanza);
636 end
637 session.resending_unacked = nil;
638 original_session.resending_unacked = nil;
639 session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", queue:count_unacked());
640 function session.send(stanza) -- luacheck: ignore 432
641 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza));
642 return false;
643 end
644 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()});
645 original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption
646 request_ack_now_if_needed(original_session, true, "handle_resume", nil);
647 resumption_age:sample(age);
648 end
649 return true; 667 return true;
650 end 668 end
669
651 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); 670 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
652 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); 671 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);
653 672
654 -- Events when it's sensible to request an ack 673 -- Events when it's sensible to request an ack
655 -- Could experiment with forcing (ignoring max_unacked) <r>, but when and why? 674 -- Could experiment with forcing (ignoring max_unacked) <r>, but when and why?
700 -- counter value, so it can be communicated to the client when it tries to 719 -- counter value, so it can be communicated to the client when it tries to
701 -- resume the lost session after a restart. 720 -- resume the lost session after a restart.
702 for _, user in pairs(local_sessions) do 721 for _, user in pairs(local_sessions) do
703 for _, session in pairs(user.sessions) do 722 for _, session in pairs(user.sessions) do
704 if session.resumption_token then 723 if session.resumption_token then
705 if old_session_registry:set(session.username, session.resumption_token, 724 if save_old_session(session) then
706 { h = session.handled_stanza_count; t = os.time() }) then
707 session.resumption_token = nil; 725 session.resumption_token = nil;
708 726
709 -- Deal with unacked stanzas 727 -- Deal with unacked stanzas
710 if session.outgoing_stanza_queue then 728 if session.outgoing_stanza_queue then
711 handle_unacked_stanzas(session); 729 handle_unacked_stanzas(session);