Software / code / prosody
Comparison
plugins/mod_smacks.lua @ 12802:4a8740e01813
Merge 0.12->trunk
| author | Kim Alvefur <zash@zash.se> |
|---|---|
| date | Mon, 12 Dec 2022 07:10:54 +0100 |
| parent | 12800:06ba2f8cee47 |
| child | 12803:2e12290820e8 |
comparison
equal
deleted
inserted
replaced
| 12801:ebd6b4d8bf04 | 12802:4a8740e01813 |
|---|---|
| 1 -- XEP-0198: Stream Management for Prosody IM | 1 -- XEP-0198: Stream Management for Prosody IM |
| 2 -- | 2 -- |
| 3 -- Copyright (C) 2010-2015 Matthew Wild | 3 -- Copyright (C) 2010-2015 Matthew Wild |
| 4 -- Copyright (C) 2010 Waqas Hussain | 4 -- Copyright (C) 2010 Waqas Hussain |
| 5 -- Copyright (C) 2012-2021 Kim Alvefur | 5 -- Copyright (C) 2012-2022 Kim Alvefur |
| 6 -- Copyright (C) 2012 Thijs Alkemade | 6 -- Copyright (C) 2012 Thijs Alkemade |
| 7 -- Copyright (C) 2014 Florian Zeitz | 7 -- Copyright (C) 2014 Florian Zeitz |
| 8 -- Copyright (C) 2016-2020 Thilo Molitor | 8 -- Copyright (C) 2016-2020 Thilo Molitor |
| 9 -- | 9 -- |
| 10 -- This project is MIT/X11 licensed. Please see the | 10 -- This project is MIT/X11 licensed. Please see the |
| 11 -- COPYING file in the source package for more information. | 11 -- COPYING file in the source package for more information. |
| 12 -- | 12 -- |
| 13 -- TODO unify sendq and smqueue | |
| 13 | 14 |
| 14 local tonumber = tonumber; | 15 local tonumber = tonumber; |
| 15 local tostring = tostring; | 16 local tostring = tostring; |
| 16 local os_time = os.time; | 17 local os_time = os.time; |
| 17 | 18 |
| 81 | 82 |
| 82 local all_old_sessions = module:open_store("smacks_h"); | 83 local all_old_sessions = module:open_store("smacks_h"); |
| 83 local old_session_registry = module:open_store("smacks_h", "map"); | 84 local old_session_registry = module:open_store("smacks_h", "map"); |
| 84 local session_registry = module:shared "/*/smacks/resumption-tokens"; -- > user@host/resumption-token --> resource | 85 local session_registry = module:shared "/*/smacks/resumption-tokens"; -- > user@host/resumption-token --> resource |
| 85 | 86 |
| 87 local function track_session(session, id) | |
| 88 session_registry[jid.join(session.username, session.host, id or session.resumption_token)] = session; | |
| 89 session.resumption_token = id; | |
| 90 end | |
| 91 | |
| 92 local function save_old_session(session) | |
| 93 session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil; | |
| 94 return old_session_registry:set(session.username, session.resumption_token, | |
| 95 { h = session.handled_stanza_count; t = os.time() }) | |
| 96 end | |
| 97 | |
| 98 local function clear_old_session(session, id) | |
| 99 session_registry[jid.join(session.username, session.host, id or session.resumption_token)] = nil; | |
| 100 return old_session_registry:set(session.username, id or session.resumption_token, nil) | |
| 101 end | |
| 102 | |
| 86 local ack_errors = require"util.error".init("mod_smacks", xmlns_sm3, { | 103 local ack_errors = require"util.error".init("mod_smacks", xmlns_sm3, { |
| 87 head = { condition = "undefined-condition"; text = "Client acknowledged more stanzas than sent by server" }; | 104 head = { condition = "undefined-condition"; text = "Client acknowledged more stanzas than sent by server" }; |
| 88 tail = { condition = "undefined-condition"; text = "Client acknowledged less stanzas than already acknowledged" }; | 105 tail = { condition = "undefined-condition"; text = "Client acknowledged less stanzas than already acknowledged" }; |
| 89 pop = { condition = "internal-server-error"; text = "Something went wrong with Stream Management" }; | 106 pop = { condition = "internal-server-error"; text = "Something went wrong with Stream Management" }; |
| 90 overflow = { condition = "resource-constraint", text = "Too many unacked stanzas remaining, session can't be resumed" } | 107 overflow = { condition = "resource-constraint", text = "Too many unacked stanzas remaining, session can't be resumed" } |
| 108 }); | |
| 109 | |
| 110 local enable_errors = require "util.error".init("mod_smacks", xmlns_sm3, { | |
| 111 already_enabled = { condition = "unexpected-request", text = "Stream management is already enabled" }; | |
| 112 bind_required = { condition = "unexpected-request", text = "Client must bind a resource before enabling stream management" }; | |
| 113 unavailable = { condition = "service-unavailable", text = "Stream management is not available for this stream" }; | |
| 114 -- Resumption | |
| 115 expired = { condition = "item-not-found", text = "Session expired, and cannot be resumed" }; | |
| 116 already_bound = { condition = "unexpected-request", text = "Cannot resume another session after a resource is bound" }; | |
| 117 unknown_session = { condition = "item-not-found", text = "Unknown session" }; | |
| 91 }); | 118 }); |
| 92 | 119 |
| 93 -- COMPAT note the use of compatibility wrapper in events (queue:table()) | 120 -- COMPAT note the use of compatibility wrapper in events (queue:table()) |
| 94 | 121 |
| 95 local function ack_delayed(session, stanza) | 122 local function ack_delayed(session, stanza) |
| 102 end | 129 end |
| 103 session.delayed_ack_timer = nil; | 130 session.delayed_ack_timer = nil; |
| 104 end | 131 end |
| 105 | 132 |
| 106 local function can_do_smacks(session, advertise_only) | 133 local function can_do_smacks(session, advertise_only) |
| 107 if session.smacks then return false, "unexpected-request", "Stream management is already enabled"; end | 134 if session.smacks then return false, enable_errors.new("already_enabled"); end |
| 108 | 135 |
| 109 local session_type = session.type; | 136 local session_type = session.type; |
| 110 if session.username then | 137 if session.username then |
| 111 if not(advertise_only) and not(session.resource) then -- Fail unless we're only advertising sm | 138 if not(advertise_only) and not(session.resource) then -- Fail unless we're only advertising sm |
| 112 return false, "unexpected-request", "Client must bind a resource before enabling stream management"; | 139 return false, enable_errors.new("bind_required"); |
| 113 end | 140 end |
| 114 return true; | 141 return true; |
| 115 elseif s2s_smacks and (session_type == "s2sin" or session_type == "s2sout") then | 142 elseif s2s_smacks and (session_type == "s2sin" or session_type == "s2sout") then |
| 116 return true; | 143 return true; |
| 117 end | 144 end |
| 118 return false, "service-unavailable", "Stream management is not available for this stream"; | 145 return false, enable_errors.new("unavailable"); |
| 119 end | 146 end |
| 120 | 147 |
| 121 module:hook("stream-features", | 148 module:hook("stream-features", |
| 122 function (event) | 149 function (event) |
| 123 if can_do_smacks(event.origin, true) then | 150 if can_do_smacks(event.origin, true) then |
| 153 return queue:count_unacked() > max_unacked and expected_h ~= session.last_requested_h; | 180 return queue:count_unacked() > max_unacked and expected_h ~= session.last_requested_h; |
| 154 end | 181 end |
| 155 | 182 |
| 156 local function request_ack(session, reason) | 183 local function request_ack(session, reason) |
| 157 local queue = session.outgoing_stanza_queue; | 184 local queue = session.outgoing_stanza_queue; |
| 158 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, queue:count_unacked()); | 185 session.log("debug", "Sending <r> from %s - #queue=%d", reason, queue:count_unacked()); |
| 159 session.awaiting_ack = true; | 186 session.awaiting_ack = true; |
| 160 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) | 187 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) |
| 161 if session.destroyed then return end -- sending something can trigger destruction | 188 if session.destroyed then return end -- sending something can trigger destruction |
| 162 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) | 189 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) |
| 163 session.last_requested_h = queue:count_acked() + queue:count_unacked(); | 190 session.last_requested_h = queue:count_acked() + queue:count_unacked(); |
| 164 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, queue:count_unacked()); | |
| 165 if not session.delayed_ack_timer then | 191 if not session.delayed_ack_timer then |
| 166 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() | 192 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() |
| 167 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue | 193 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue |
| 168 end); | 194 end); |
| 169 end | 195 end |
| 178 local function outgoing_stanza_filter(stanza, session) | 204 local function outgoing_stanza_filter(stanza, session) |
| 179 -- XXX: Normally you wouldn't have to check the xmlns for a stanza as it's | 205 -- XXX: Normally you wouldn't have to check the xmlns for a stanza as it's |
| 180 -- supposed to be nil. | 206 -- supposed to be nil. |
| 181 -- However, when using mod_smacks with mod_websocket, then mod_websocket's | 207 -- However, when using mod_smacks with mod_websocket, then mod_websocket's |
| 182 -- stanzas/out filter can get called before this one and adds the xmlns. | 208 -- stanzas/out filter can get called before this one and adds the xmlns. |
| 183 if session.resending_unacked then return stanza end | |
| 184 if not session.smacks then return stanza end | 209 if not session.smacks then return stanza end |
| 185 local is_stanza = st.is_stanza(stanza) and | 210 local is_stanza = st.is_stanza(stanza) and |
| 186 (not stanza.attr.xmlns or stanza.attr.xmlns == 'jabber:client') | 211 (not stanza.attr.xmlns or stanza.attr.xmlns == 'jabber:client') |
| 187 and not stanza.name:find":"; | 212 and not stanza.name:find":"; |
| 188 | 213 |
| 232 module:hook("pre-session-close", function(event) | 257 module:hook("pre-session-close", function(event) |
| 233 local session = event.session; | 258 local session = event.session; |
| 234 if session.smacks == nil then return end | 259 if session.smacks == nil then return end |
| 235 if session.resumption_token then | 260 if session.resumption_token then |
| 236 session.log("debug", "Revoking resumption token"); | 261 session.log("debug", "Revoking resumption token"); |
| 237 session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil; | 262 clear_old_session(session); |
| 238 old_session_registry:set(session.username, session.resumption_token, nil); | |
| 239 session.resumption_token = nil; | 263 session.resumption_token = nil; |
| 240 else | 264 else |
| 241 session.log("debug", "Session not resumable"); | 265 session.log("debug", "Session not resumable"); |
| 242 end | 266 end |
| 243 if session.hibernating_watchdog then | 267 if session.hibernating_watchdog then |
| 272 wrap_session_out(session, resume); | 296 wrap_session_out(session, resume); |
| 273 wrap_session_in(session, resume); | 297 wrap_session_in(session, resume); |
| 274 return session; | 298 return session; |
| 275 end | 299 end |
| 276 | 300 |
| 277 function handle_enable(session, stanza, xmlns_sm) | 301 function do_enable(session, stanza) |
| 278 local ok, err, err_text = can_do_smacks(session); | 302 local ok, err = can_do_smacks(session); |
| 279 if not ok then | 303 if not ok then |
| 280 session.log("warn", "Failed to enable smacks: %s", err_text); -- TODO: XEP doesn't say we can send error text, should it? | 304 session.log("warn", "Failed to enable smacks: %s", err.text); -- TODO: XEP doesn't say we can send error text, should it? |
| 281 (session.sends2s or session.send)(st.stanza("failed", { xmlns = xmlns_sm }):tag(err, { xmlns = xmlns_errors})); | 305 return nil, err; |
| 282 return true; | |
| 283 end | 306 end |
| 284 | 307 |
| 285 if session.username then | 308 if session.username then |
| 286 local old_sessions, err = all_old_sessions:get(session.username); | 309 local old_sessions, err = all_old_sessions:get(session.username); |
| 287 module:log("debug", "Old sessions: %q", old_sessions) | 310 session.log("debug", "Old sessions: %q", old_sessions) |
| 288 if old_sessions then | 311 if old_sessions then |
| 289 local keep, count = {}, 0; | 312 local keep, count = {}, 0; |
| 290 for token, info in it.sorted_pairs(old_sessions, function(a, b) | 313 for token, info in it.sorted_pairs(old_sessions, function(a, b) |
| 291 return (old_sessions[a].t or 0) > (old_sessions[b].t or 0); | 314 return (old_sessions[a].t or 0) > (old_sessions[b].t or 0); |
| 292 end) do | 315 end) do |
| 294 if count > max_old_sessions then break end | 317 if count > max_old_sessions then break end |
| 295 keep[token] = info; | 318 keep[token] = info; |
| 296 end | 319 end |
| 297 all_old_sessions:set(session.username, keep); | 320 all_old_sessions:set(session.username, keep); |
| 298 elseif err then | 321 elseif err then |
| 299 module:log("error", "Unable to retrieve old resumption counters: %s", err); | 322 session.log("error", "Unable to retrieve old resumption counters: %s", err); |
| 300 end | 323 end |
| 301 end | 324 end |
| 302 | 325 |
| 303 module:log("debug", "Enabling stream management"); | |
| 304 session.smacks = xmlns_sm; | |
| 305 | |
| 306 wrap_session(session, false); | |
| 307 | |
| 308 local resume_max; | |
| 309 local resume_token; | 326 local resume_token; |
| 310 local resume = stanza.attr.resume; | 327 local resume = stanza.attr.resume; |
| 311 if (resume == "true" or resume == "1") and session.username then | 328 if (resume == "true" or resume == "1") and session.username then |
| 312 -- resumption on s2s is not currently supported | 329 -- resumption on s2s is not currently supported |
| 313 resume_token = new_id(); | 330 resume_token = new_id(); |
| 314 session_registry[jid.join(session.username, session.host, resume_token)] = session; | 331 end |
| 315 session.resumption_token = resume_token; | 332 |
| 316 resume_max = tostring(resume_timeout); | 333 return { |
| 317 end | 334 type = "enabled"; |
| 318 (session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume, max = resume_max })); | 335 id = resume_token; |
| 336 resume_max = resume_token and tostring(resume_timeout) or nil; | |
| 337 session = session; | |
| 338 finish = function () | |
| 339 session.log("debug", "Enabling stream management"); | |
| 340 | |
| 341 session.smacks = stanza.attr.xmlns; | |
| 342 if resume_token then | |
| 343 track_session(session, resume_token); | |
| 344 end | |
| 345 wrap_session(session, false); | |
| 346 end; | |
| 347 }; | |
| 348 end | |
| 349 | |
| 350 function handle_enable(session, stanza, xmlns_sm) | |
| 351 local enabled, err = do_enable(session, stanza); | |
| 352 if not enabled then | |
| 353 (session.sends2s or session.send)(st.stanza("failed", { xmlns = xmlns_sm }):add_error(err)); | |
| 354 return true; | |
| 355 end | |
| 356 | |
| 357 (session.sends2s or session.send)(st.stanza("enabled", { | |
| 358 xmlns = xmlns_sm; | |
| 359 id = enabled.id; | |
| 360 resume = enabled.id and "true" or nil; -- COMPAT w/ Conversations 2.10.10 requires 'true' not '1' | |
| 361 max = enabled.resume_max; | |
| 362 })); | |
| 363 | |
| 364 session.smacks = xmlns_sm; | |
| 365 enabled.finish(); | |
| 366 | |
| 319 return true; | 367 return true; |
| 320 end | 368 end |
| 321 module:hook_tag(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100); | 369 module:hook_tag(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100); |
| 322 module:hook_tag(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100); | 370 module:hook_tag(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100); |
| 323 | 371 |
| 324 module:hook_tag("http://etherx.jabber.org/streams", "features", | 372 module:hook_tag("http://etherx.jabber.org/streams", "features", function(session, stanza) |
| 325 function (session, stanza) | 373 if can_do_smacks(session) then |
| 326 -- Needs to be done after flushing sendq since those aren't stored as | 374 session.smacks_feature = stanza:get_child("sm", xmlns_sm3) or stanza:get_child("sm", xmlns_sm2); |
| 327 -- stanzas and counting them is weird. | 375 end |
| 328 -- TODO unify sendq and smqueue | 376 end); |
| 329 timer.add_task(1e-6, function () | 377 |
| 330 if can_do_smacks(session) then | 378 module:hook("s2sout-established", function (event) |
| 331 if stanza:get_child("sm", xmlns_sm3) then | 379 local session = event.session; |
| 332 session.sends2s(st.stanza("enable", sm3_attr)); | 380 if not session.smacks_feature then return end |
| 333 session.smacks = xmlns_sm3; | 381 |
| 334 elseif stanza:get_child("sm", xmlns_sm2) then | 382 session.smacks = session.smacks_feature.attr.xmlns; |
| 335 session.sends2s(st.stanza("enable", sm2_attr)); | 383 wrap_session_out(session, false); |
| 336 session.smacks = xmlns_sm2; | 384 session.sends2s(st.stanza("enable", { xmlns = session.smacks })); |
| 337 else | 385 end); |
| 338 return; | |
| 339 end | |
| 340 wrap_session_out(session, false); | |
| 341 end | |
| 342 end); | |
| 343 end); | |
| 344 | 386 |
| 345 function handle_enabled(session, stanza, xmlns_sm) -- luacheck: ignore 212/stanza | 387 function handle_enabled(session, stanza, xmlns_sm) -- luacheck: ignore 212/stanza |
| 346 module:log("debug", "Enabling stream management"); | 388 session.log("debug", "Enabling stream management"); |
| 347 session.smacks = xmlns_sm; | 389 session.smacks = xmlns_sm; |
| 348 | 390 |
| 349 wrap_session_in(session, false); | 391 wrap_session_in(session, false); |
| 350 | 392 |
| 351 -- FIXME Resume? | 393 -- FIXME Resume? |
| 355 module:hook_tag(xmlns_sm2, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm2); end, 100); | 397 module:hook_tag(xmlns_sm2, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm2); end, 100); |
| 356 module:hook_tag(xmlns_sm3, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm3); end, 100); | 398 module:hook_tag(xmlns_sm3, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm3); end, 100); |
| 357 | 399 |
| 358 function handle_r(origin, stanza, xmlns_sm) -- luacheck: ignore 212/stanza | 400 function handle_r(origin, stanza, xmlns_sm) -- luacheck: ignore 212/stanza |
| 359 if not origin.smacks then | 401 if not origin.smacks then |
| 360 module:log("debug", "Received ack request from non-smack-enabled session"); | 402 origin.log("debug", "Received ack request from non-smack-enabled session"); |
| 361 return; | 403 return; |
| 362 end | 404 end |
| 363 module:log("debug", "Received ack request, acking for %d", origin.handled_stanza_count); | 405 origin.log("debug", "Received ack request, acking for %d", origin.handled_stanza_count); |
| 364 -- Reply with <a> | 406 -- Reply with <a> |
| 365 (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = format_h(origin.handled_stanza_count) })); | 407 (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = format_h(origin.handled_stanza_count) })); |
| 366 -- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h) | 408 -- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h) |
| 367 request_ack_now_if_needed(origin, false, "piggybacked by handle_r", nil); | 409 request_ack_now_if_needed(origin, false, "piggybacked by handle_r", nil); |
| 368 return true; | 410 return true; |
| 411 | 453 |
| 412 local function handle_unacked_stanzas(session) | 454 local function handle_unacked_stanzas(session) |
| 413 local queue = session.outgoing_stanza_queue; | 455 local queue = session.outgoing_stanza_queue; |
| 414 local unacked = queue:count_unacked() | 456 local unacked = queue:count_unacked() |
| 415 if unacked > 0 then | 457 if unacked > 0 then |
| 458 local error_from = jid.join(session.username, session.host or module.host); | |
| 416 tx_dropped_stanzas:sample(unacked); | 459 tx_dropped_stanzas:sample(unacked); |
| 417 session.smacks = false; -- Disable queueing | 460 session.smacks = false; -- Disable queueing |
| 418 session.outgoing_stanza_queue = nil; | 461 session.outgoing_stanza_queue = nil; |
| 419 for stanza in queue._queue:consume() do | 462 for stanza in queue._queue:consume() do |
| 420 if not module:fire_event("delivery/failure", { session = session, stanza = stanza }) then | 463 if not module:fire_event("delivery/failure", { session = session, stanza = stanza }) then |
| 421 if stanza.attr.type ~= "error" and stanza.attr.from ~= session.full_jid then | 464 if stanza.attr.type ~= "error" and stanza.attr.from ~= session.full_jid then |
| 422 local reply = st.error_reply(stanza, "cancel", "recipient-unavailable"); | 465 local reply = st.error_reply(stanza, "cancel", "recipient-unavailable", nil, error_from); |
| 423 module:send(reply); | 466 module:send(reply); |
| 424 end | 467 end |
| 425 end | 468 end |
| 426 end | 469 end |
| 427 end | 470 end |
| 484 session.log("debug", "The session has already been resumed or replaced"); | 527 session.log("debug", "The session has already been resumed or replaced"); |
| 485 return | 528 return |
| 486 end | 529 end |
| 487 | 530 |
| 488 session.log("debug", "Destroying session for hibernating too long"); | 531 session.log("debug", "Destroying session for hibernating too long"); |
| 489 session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil; | 532 save_old_session(session); |
| 490 old_session_registry:set(session.username, session.resumption_token, | |
| 491 { h = session.handled_stanza_count; t = os.time() }); | |
| 492 session.resumption_token = nil; | 533 session.resumption_token = nil; |
| 493 session.resending_unacked = true; -- stop outgoing_stanza_filter from re-queueing anything anymore | |
| 494 sessionmanager.destroy_session(session, "Hibernating too long"); | 534 sessionmanager.destroy_session(session, "Hibernating too long"); |
| 495 sessions_expired(1); | 535 sessions_expired(1); |
| 496 end); | 536 end); |
| 497 if session.conn then | 537 if session.conn then |
| 498 local conn = session.conn; | 538 local conn = session.conn; |
| 521 end | 561 end |
| 522 | 562 |
| 523 module:hook("s2sout-destroyed", handle_s2s_destroyed); | 563 module:hook("s2sout-destroyed", handle_s2s_destroyed); |
| 524 module:hook("s2sin-destroyed", handle_s2s_destroyed); | 564 module:hook("s2sin-destroyed", handle_s2s_destroyed); |
| 525 | 565 |
| 526 local function get_session_id(session) | 566 function do_resume(session, stanza) |
| 527 return session.id or (tostring(session):match("[a-f0-9]+$")); | |
| 528 end | |
| 529 | |
| 530 function handle_resume(session, stanza, xmlns_sm) | |
| 531 if session.full_jid then | 567 if session.full_jid then |
| 532 session.log("warn", "Tried to resume after resource binding"); | 568 session.log("warn", "Tried to resume after resource binding"); |
| 533 session.send(st.stanza("failed", { xmlns = xmlns_sm }) | 569 return nil, enable_errors.new("already_bound"); |
| 534 :tag("unexpected-request", { xmlns = xmlns_errors }) | |
| 535 ); | |
| 536 return true; | |
| 537 end | 570 end |
| 538 | 571 |
| 539 local id = stanza.attr.previd; | 572 local id = stanza.attr.previd; |
| 540 local original_session = session_registry[jid.join(session.username, session.host, id)]; | 573 local original_session = session_registry[jid.join(session.username, session.host, id)]; |
| 541 if not original_session then | 574 if not original_session then |
| 542 local old_session = old_session_registry:get(session.username, id); | 575 local old_session = old_session_registry:get(session.username, id); |
| 543 if old_session then | 576 if old_session then |
| 544 session.log("debug", "Tried to resume old expired session with id %s", id); | 577 session.log("debug", "Tried to resume old expired session with id %s", id); |
| 545 session.send(st.stanza("failed", { xmlns = xmlns_sm, h = format_h(old_session.h) }) | 578 clear_old_session(session, id); |
| 546 :tag("item-not-found", { xmlns = xmlns_errors }) | |
| 547 ); | |
| 548 old_session_registry:set(session.username, id, nil); | |
| 549 resumption_expired(1); | 579 resumption_expired(1); |
| 550 else | 580 return nil, enable_errors.new("expired", { h = old_session.h }); |
| 551 session.log("debug", "Tried to resume non-existent session with id %s", id); | 581 end |
| 552 session.send(st.stanza("failed", { xmlns = xmlns_sm }) | 582 session.log("debug", "Tried to resume non-existent session with id %s", id); |
| 553 :tag("item-not-found", { xmlns = xmlns_errors }) | 583 return nil, enable_errors.new("unknown_session"); |
| 554 ); | 584 end |
| 585 | |
| 586 if original_session.hibernating_watchdog then | |
| 587 original_session.log("debug", "Letting the watchdog go"); | |
| 588 original_session.hibernating_watchdog:cancel(); | |
| 589 original_session.hibernating_watchdog = nil; | |
| 590 elseif session.hibernating then | |
| 591 original_session.log("error", "Hibernating session has no watchdog!") | |
| 592 end | |
| 593 -- zero age = was not hibernating yet | |
| 594 local age = 0; | |
| 595 if original_session.hibernating then | |
| 596 local now = os_time(); | |
| 597 age = now - original_session.hibernating; | |
| 598 end | |
| 599 | |
| 600 session.log("debug", "mod_smacks resuming existing session %s...", original_session.id); | |
| 601 | |
| 602 local queue = original_session.outgoing_stanza_queue; | |
| 603 local h = tonumber(stanza.attr.h); | |
| 604 | |
| 605 original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked()) | |
| 606 local acked, err = ack_errors.coerce(queue:ack(h)); -- luacheck: ignore 211/acked | |
| 607 | |
| 608 if not err and not queue:resumable() then | |
| 609 err = ack_errors.new("overflow"); | |
| 610 end | |
| 611 | |
| 612 if err then | |
| 613 session.log("debug", "Resumption failed: %s", err); | |
| 614 return nil, err; | |
| 615 end | |
| 616 | |
| 617 -- Update original_session with the parameters (connection, etc.) from the new session | |
| 618 sessionmanager.update_session(original_session, session); | |
| 619 | |
| 620 return { | |
| 621 type = "resumed"; | |
| 622 session = original_session; | |
| 623 id = id; | |
| 624 -- Return function to complete the resumption and resync unacked stanzas | |
| 625 -- This is two steps so we can support SASL2/ISR | |
| 626 finish = function () | |
| 627 -- Ok, we need to re-send any stanzas that the client didn't see | |
| 628 -- ...they are what is now left in the outgoing stanza queue | |
| 629 -- We have to use the send of "session" because we don't want to add our resent stanzas | |
| 630 -- to the outgoing queue again | |
| 631 | |
| 632 original_session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked()); | |
| 633 for _, queued_stanza in queue:resume() do | |
| 634 original_session.send(queued_stanza); | |
| 635 end | |
| 636 original_session.log("debug", "all stanzas resent, enabling stream management on resumed stream, #queue = %d", queue:count_unacked()); | |
| 637 | |
| 638 -- Add our own handlers to the resumed session (filters have been reset in the update) | |
| 639 wrap_session(original_session, true); | |
| 640 | |
| 641 -- Let everyone know that we are no longer hibernating | |
| 642 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()}); | |
| 643 original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption | |
| 644 request_ack_now_if_needed(original_session, true, "handle_resume", nil); | |
| 645 resumption_age:sample(age); | |
| 555 end; | 646 end; |
| 556 else | 647 }; |
| 557 if original_session.hibernating_watchdog then | 648 end |
| 558 original_session.log("debug", "Letting the watchdog go"); | 649 |
| 559 original_session.hibernating_watchdog:cancel(); | 650 function handle_resume(session, stanza, xmlns_sm) |
| 560 original_session.hibernating_watchdog = nil; | 651 local resumed, err = do_resume(session, stanza); |
| 561 elseif session.hibernating then | 652 if not resumed then |
| 562 original_session.log("error", "Hibernating session has no watchdog!") | 653 session.send(st.stanza("failed", { xmlns = xmlns_sm, h = format_h(err.context.h) }) |
| 563 end | 654 :tag(err.condition, { xmlns = xmlns_errors })); |
| 564 -- zero age = was not hibernating yet | 655 return true; |
| 565 local age = 0; | 656 end |
| 566 if original_session.hibernating then | 657 |
| 567 local now = os_time(); | 658 session = resumed.session; |
| 568 age = now - original_session.hibernating; | 659 |
| 569 end | 660 -- Inform client of successful resumption |
| 570 session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session)); | 661 session.send(st.stanza("resumed", { xmlns = xmlns_sm, |
| 571 original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session)); | 662 h = format_h(session.handled_stanza_count), previd = resumed.id })); |
| 572 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) | 663 |
| 573 if original_session.conn then | 664 -- Complete resume (sync stanzas, etc.) |
| 574 original_session.log("debug", "mod_smacks closing an old connection for this session"); | 665 resumed.finish(); |
| 575 local conn = original_session.conn; | 666 |
| 576 c2s_sessions[conn] = nil; | |
| 577 conn:close(); | |
| 578 end | |
| 579 | |
| 580 local migrated_session_log = session.log; | |
| 581 original_session.ip = session.ip; | |
| 582 original_session.conn = session.conn; | |
| 583 original_session.rawsend = session.rawsend; | |
| 584 original_session.rawsend.session = original_session; | |
| 585 original_session.rawsend.conn = original_session.conn; | |
| 586 original_session.send = session.send; | |
| 587 original_session.send.session = original_session; | |
| 588 original_session.close = session.close; | |
| 589 original_session.filter = session.filter; | |
| 590 original_session.filter.session = original_session; | |
| 591 original_session.filters = session.filters; | |
| 592 original_session.send.filter = original_session.filter; | |
| 593 original_session.stream = session.stream; | |
| 594 original_session.secure = session.secure; | |
| 595 original_session.hibernating = nil; | |
| 596 original_session.resumption_counter = (original_session.resumption_counter or 0) + 1; | |
| 597 session.log = original_session.log; | |
| 598 session.type = original_session.type; | |
| 599 wrap_session(original_session, true); | |
| 600 -- Inform xmppstream of the new session (passed to its callbacks) | |
| 601 original_session.stream:set_session(original_session); | |
| 602 -- Similar for connlisteners | |
| 603 c2s_sessions[session.conn] = original_session; | |
| 604 | |
| 605 local queue = original_session.outgoing_stanza_queue; | |
| 606 local h = tonumber(stanza.attr.h); | |
| 607 | |
| 608 original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked()) | |
| 609 local acked, err = ack_errors.coerce(queue:ack(h)); -- luacheck: ignore 211/acked | |
| 610 | |
| 611 if not err and not queue:resumable() then | |
| 612 err = ack_errors.new("overflow"); | |
| 613 end | |
| 614 | |
| 615 if err or not queue:resumable() then | |
| 616 original_session.send(st.stanza("failed", | |
| 617 { xmlns = xmlns_sm; h = format_h(original_session.handled_stanza_count); previd = id })); | |
| 618 original_session:close(err); | |
| 619 return false; | |
| 620 end | |
| 621 | |
| 622 original_session.send(st.stanza("resumed", { xmlns = xmlns_sm, | |
| 623 h = format_h(original_session.handled_stanza_count), previd = id })); | |
| 624 | |
| 625 -- Ok, we need to re-send any stanzas that the client didn't see | |
| 626 -- ...they are what is now left in the outgoing stanza queue | |
| 627 -- We have to use the send of "session" because we don't want to add our resent stanzas | |
| 628 -- to the outgoing queue again | |
| 629 | |
| 630 session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked()); | |
| 631 -- FIXME Which session is it that the queue filter sees? | |
| 632 session.resending_unacked = true; | |
| 633 original_session.resending_unacked = true; | |
| 634 for _, queued_stanza in queue:resume() do | |
| 635 session.send(queued_stanza); | |
| 636 end | |
| 637 session.resending_unacked = nil; | |
| 638 original_session.resending_unacked = nil; | |
| 639 session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", queue:count_unacked()); | |
| 640 function session.send(stanza) -- luacheck: ignore 432 | |
| 641 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); | |
| 642 return false; | |
| 643 end | |
| 644 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()}); | |
| 645 original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption | |
| 646 request_ack_now_if_needed(original_session, true, "handle_resume", nil); | |
| 647 resumption_age:sample(age); | |
| 648 end | |
| 649 return true; | 667 return true; |
| 650 end | 668 end |
| 669 | |
| 651 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); | 670 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); |
| 652 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); | 671 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); |
| 653 | 672 |
| 654 -- Events when it's sensible to request an ack | 673 -- Events when it's sensible to request an ack |
| 655 -- Could experiment with forcing (ignoring max_unacked) <r>, but when and why? | 674 -- Could experiment with forcing (ignoring max_unacked) <r>, but when and why? |
| 700 -- counter value, so it can be communicated to the client when it tries to | 719 -- counter value, so it can be communicated to the client when it tries to |
| 701 -- resume the lost session after a restart. | 720 -- resume the lost session after a restart. |
| 702 for _, user in pairs(local_sessions) do | 721 for _, user in pairs(local_sessions) do |
| 703 for _, session in pairs(user.sessions) do | 722 for _, session in pairs(user.sessions) do |
| 704 if session.resumption_token then | 723 if session.resumption_token then |
| 705 if old_session_registry:set(session.username, session.resumption_token, | 724 if save_old_session(session) then |
| 706 { h = session.handled_stanza_count; t = os.time() }) then | |
| 707 session.resumption_token = nil; | 725 session.resumption_token = nil; |
| 708 | 726 |
| 709 -- Deal with unacked stanzas | 727 -- Deal with unacked stanzas |
| 710 if session.outgoing_stanza_queue then | 728 if session.outgoing_stanza_queue then |
| 711 handle_unacked_stanzas(session); | 729 handle_unacked_stanzas(session); |