Software /
code /
prosody-modules
Changeset
6025:a5fff4995862
mod_smacks: Obsolete this module, it is included in Prosody since 0.12
author | Emmanuel Gil Peyrot <linkmauve@linkmauve.fr> |
---|---|
date | Sun, 27 Oct 2024 13:14:25 +0100 |
parents | 6024:fc607d79765a |
children | 6026:37c77676303b |
files | mod_smacks/README.md mod_smacks/mod_smacks.lua |
diffstat | 2 files changed, 0 insertions(+), 771 deletions(-) [+] |
line wrap: on
line diff
--- a/mod_smacks/README.md Wed Oct 23 14:55:17 2024 +0200 +++ b/mod_smacks/README.md Sun Oct 27 13:14:25 2024 +0100 @@ -6,84 +6,3 @@ ... Since Prosody 0.12, this module is [included in Prosody](https://prosody.im/doc/modules/mod_smacks). - -#Introduction - -By default XMPP is as reliable as your network is. Unfortunately in some -cases that is not very reliable - in some network conditions disconnects -can be frequent and message loss can occur. - -To overcome this, XMPP has an optional extension (XEP-0198: Stream -Management) which, when supported by both the client and server, can -allow a client to resume a disconnected session, and prevent message -loss. - -#Details - -When using XEP-0198 both the client and the server keep a queue of the -most recently sent stanzas - this is cleared when the other end -acknowledges they have received the stanzas. If the client disconnects, -instead of marking the user offline the server pretends the client is -still online for a short (configurable) period of time. If the client -reconnects within this period, any stanzas in the queue that the client -did not receive are re-sent. - -If the client fails to reconnect before the timeout it will be marked as -offline like prosody does on disconnect without mod_smacks. -If the client is the last one for this jid, all message stanzas are added to -the offline store and all other stanzas stanzas are returned with an -"recipient-unavailable" error. If the client is not the last one with an -open smacks session, *all* stanzas are returned with an "recipient-unavailable" error. - -If you deliberately disabled [mod_offline], all message stanzas of the last client -are also returned with an "recipient-unavailable" error, because the can not be -added to the offline storage. -If you don't want this behaviour you can use [mod_nooffline_noerror] to suppress the error. -This is generally only advisable, if you are sure that all your clients are using MAM! - -This module also provides some events used by [mod_cloud_notify]. -These events are: "smacks-ack-delayed", "smacks-hibernation-start" and -"smacks-hibernation-end". See [mod_cloud_notify] for details on how this -events are used there. - -Use prosody 0.10+ to have per user limits on allowed sessions in hibernation -state and allowed sessions for which the h-value is kept even after the -hibernation timed out. -These are settable using `smacks_max_hibernated_sessions` and `smacks_max_old_sessions`. - -#Configuration - - Option Default Description - ---------------------------------- ----------------- ------------------------------------------------------------------------------------------------------------------ - `smacks_hibernation_time` 600 (10 minutes) The number of seconds a disconnected session should stay alive for (to allow reconnect) - `smacks_enabled_s2s` true Enable Stream Management on server connections? *Experimental* - `smacks_s2s_resend` false Attempt to re-send unacked messages on s2s disconnect *Experimental* - `smacks_max_unacked_stanzas` 0 How many stanzas to send before requesting acknowledgement - `smacks_max_ack_delay` 30 (1/2 minute) The number of seconds an ack must be unanswered to trigger an "smacks-ack-delayed" event - `smacks_max_hibernated_sessions` 10 The number of allowed sessions in hibernated state (limited per user) - `smacks_max_old_sessions` 10 The number of allowed sessions with timed out hibernation for which the h-value is still kept (limited per user) - -#Compatibility - - ------- ------- - trunk Works - 0.11 Works - ------- ------- - - -#Clients - -Clients that support [XEP-0198]: - -- Gajim (Linux, Windows, OS X) -- Conversations (Android) -- ChatSecure (iOS) -- Swift (but not resumption, as of version 2.0 and alphas of 3.0) -- Psi (in an unreleased branch) -- Yaxim (Android) -- Monal (iOS) - -[7693724881b3]: //hg.prosody.im/prosody-modules/raw-file/7693724881b3/mod_smacks/mod_smacks.lua -[mod_offline]: //modules.prosody.im/mod_offline -[mod_nooffline_noerror]: //modules.prosody.im/mod_nooffline_noerror -[mod_cloud_notify]: //modules.prosody.im/mod_cloud_notify
--- a/mod_smacks/mod_smacks.lua Wed Oct 23 14:55:17 2024 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,690 +0,0 @@ --- XEP-0198: Stream Management for Prosody IM --- --- Copyright (C) 2010-2015 Matthew Wild --- Copyright (C) 2010 Waqas Hussain --- Copyright (C) 2012-2021 Kim Alvefur --- Copyright (C) 2012 Thijs Alkemade --- Copyright (C) 2014 Florian Zeitz --- Copyright (C) 2016-2020 Thilo Molitor --- --- This project is MIT/X11 licensed. Please see the --- COPYING file in the source package for more information. --- - -local st = require "util.stanza"; -local dep = require "util.dependencies"; -local cache = dep.softreq("util.cache"); -- only available in prosody 0.10+ -local uuid_generate = require "util.uuid".generate; -local jid = require "util.jid"; - -local t_remove = table.remove; -local math_min = math.min; -local math_max = math.max; -local os_time = os.time; -local tonumber, tostring = tonumber, tostring; -local add_filter = require "util.filters".add_filter; -local timer = require "util.timer"; -local datetime = require "util.datetime"; - -local xmlns_mam2 = "urn:xmpp:mam:2"; -local xmlns_sm2 = "urn:xmpp:sm:2"; -local xmlns_sm3 = "urn:xmpp:sm:3"; -local xmlns_errors = "urn:ietf:params:xml:ns:xmpp-stanzas"; -local xmlns_delay = "urn:xmpp:delay"; - -local sm2_attr = { xmlns = xmlns_sm2 }; -local sm3_attr = { xmlns = xmlns_sm3 }; - -local resume_timeout = module:get_option_number("smacks_hibernation_time", 600); -local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", true); -local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false); -local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0); -local max_inactive_unacked_stanzas = module:get_option_number("smacks_max_inactive_unacked_stanzas", 256); -local delayed_ack_timeout = module:get_option_number("smacks_max_ack_delay", 30); -local max_hibernated_sessions = module:get_option_number("smacks_max_hibernated_sessions", 10); -local max_old_sessions = module:get_option_number("smacks_max_old_sessions", 10); -local core_process_stanza = prosody.core_process_stanza; -local sessionmanager = require"core.sessionmanager"; - -assert(max_hibernated_sessions > 0, "smacks_max_hibernated_sessions must be greater than 0"); -assert(max_old_sessions > 0, "smacks_max_old_sessions must be greater than 0"); - -local c2s_sessions = module:shared("/*/c2s/sessions"); - -local function init_session_cache(max_entries, evict_callback) - -- old prosody version < 0.10 (no limiting at all!) - if not cache then - local store = {}; - return { - get = function(user, key) - if not user then return nil; end - if not key then return nil; end - return store[key]; - end; - set = function(user, key, value) - if not user then return nil; end - if not key then return nil; end - store[key] = value; - end; - }; - end - - -- use per user limited cache for prosody >= 0.10 - local stores = {}; - return { - get = function(user, key) - if not user then return nil; end - if not key then return nil; end - if not stores[user] then - stores[user] = cache.new(max_entries, evict_callback); - end - return stores[user]:get(key); - end; - set = function(user, key, value) - if not user then return nil; end - if not key then return nil; end - if not stores[user] then stores[user] = cache.new(max_entries, evict_callback); end - stores[user]:set(key, value); - -- remove empty caches completely - if not stores[user]:count() then stores[user] = nil; end - end; - }; -end -local old_session_registry = init_session_cache(max_old_sessions, nil); -local session_registry = init_session_cache(max_hibernated_sessions, function(resumption_token, session) - if session.destroyed then return true; end -- destroyed session can always be removed from cache - session.log("warn", "User has too much hibernated sessions, removing oldest session (token: %s)", resumption_token); - -- store old session's h values on force delete - -- save only actual h value and username/host (for security) - old_session_registry.set(session.username, resumption_token, { - h = session.handled_stanza_count, - username = session.username, - host = session.host - }); - return true; -- allow session to be removed from full cache to make room for new one -end); - -local function stoppable_timer(delay, callback) - local stopped = false; - local timer = module:add_timer(delay, function (t) - if stopped then return; end - return callback(t); - end); - if timer and timer.stop then return timer; end -- new prosody api includes stop() function - return { - stop = function(self) stopped = true end; - timer; - }; -end - -local function delayed_ack_function(session, stanza) - -- fire event only if configured to do so and our session is not already hibernated or destroyed - if delayed_ack_timeout > 0 and session.awaiting_ack - and not session.hibernating and not session.destroyed then - session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d", - session.outgoing_stanza_queue and #session.outgoing_stanza_queue or 0); - module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue, stanza = stanza}); - end - session.delayed_ack_timer = nil; -end - -local function can_do_smacks(session, advertise_only) - if session.smacks then return false, "unexpected-request", "Stream management is already enabled"; end - - local session_type = session.type; - if session.username then - if not(advertise_only) and not(session.resource) then -- Fail unless we're only advertising sm - return false, "unexpected-request", "Client must bind a resource before enabling stream management"; - end - return true; - elseif s2s_smacks and (session_type == "s2sin" or session_type == "s2sout") then - return true; - end - return false, "service-unavailable", "Stream management is not available for this stream"; -end - -module:hook("stream-features", - function (event) - if can_do_smacks(event.origin, true) then - event.features:tag("sm", sm2_attr):tag("optional"):up():up(); - event.features:tag("sm", sm3_attr):tag("optional"):up():up(); - end - end); - -module:hook("s2s-stream-features", - function (event) - if can_do_smacks(event.origin, true) then - event.features:tag("sm", sm2_attr):tag("optional"):up():up(); - event.features:tag("sm", sm3_attr):tag("optional"):up():up(); - end - end); - -local function request_ack_if_needed(session, force, reason, stanza) - local queue = session.outgoing_stanza_queue; - local expected_h = session.last_acknowledged_stanza + #queue; - -- session.log("debug", "*** SMACKS(1) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating)); - local max_unacked = max_unacked_stanzas; - if session.state == "inactive" then - max_unacked = max_inactive_unacked_stanzas; - end - if session.awaiting_ack == nil and not session.hibernating then - -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong - -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any - -- further requests until a higher h-value would be expected. - -- session.log("debug", "*** SMACKS(2) ***: #queue=%s, max_unacked_stanzas=%s, expected_h=%s, last_requested_h=%s", tostring(#queue), tostring(max_unacked_stanzas), tostring(expected_h), tostring(session.last_requested_h)); - if (#queue > max_unacked and expected_h ~= session.last_requested_h) or force then - session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue); - session.awaiting_ack = false; - session.awaiting_ack_timer = stoppable_timer(1e-06, function () - -- session.log("debug", "*** SMACKS(3) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating)); - -- only request ack if needed and our session is not already hibernated or destroyed - if not session.awaiting_ack and not session.hibernating and not session.destroyed then - session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue); - (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) - if session.destroyed then return end -- sending something can trigger destruction - session.awaiting_ack = true; - -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) - session.last_requested_h = session.last_acknowledged_stanza + #queue; - session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue); - if not session.delayed_ack_timer then - session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() - delayed_ack_function(session, nil); -- we don't know if this is the only new stanza in the queue - end); - end - end - end); - end - end - - -- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue - -- and there isn't already a timer for this event running. - -- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event - -- would not trigger this event (again). - if #queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then - session.log("debug", "Calling delayed_ack_function directly (still waiting for ack)"); - delayed_ack_function(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules - end -end - -local function outgoing_stanza_filter(stanza, session) - -- XXX: Normally you wouldn't have to check the xmlns for a stanza as it's - -- supposed to be nil. - -- However, when using mod_smacks with mod_websocket, then mod_websocket's - -- stanzas/out filter can get called before this one and adds the xmlns. - local is_stanza = stanza.attr and - (not stanza.attr.xmlns or stanza.attr.xmlns == 'jabber:client') - and not stanza.name:find":"; - - if is_stanza and not stanza._cached then - local queue = session.outgoing_stanza_queue; - local cached_stanza = st.clone(stanza); - cached_stanza._cached = true; - - if cached_stanza and cached_stanza.name ~= "iq" and cached_stanza:get_child("delay", xmlns_delay) == nil then - cached_stanza = cached_stanza:tag("delay", { - xmlns = xmlns_delay, - from = jid.bare(session.full_jid or session.host), - stamp = datetime.datetime() - }); - end - - queue[#queue+1] = cached_stanza; - if session.hibernating then - session.log("debug", "hibernating since %s, stanza queued", datetime.datetime(session.hibernating)); - module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue, stanza = cached_stanza}); - return nil; - end - request_ack_if_needed(session, false, "outgoing_stanza_filter", stanza); - end - return stanza; -end - -local function count_incoming_stanzas(stanza, session) - if not stanza.attr.xmlns then - session.handled_stanza_count = session.handled_stanza_count + 1; - session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count); - end - return stanza; -end - -local function wrap_session_out(session, resume) - if not resume then - session.outgoing_stanza_queue = {}; - session.last_acknowledged_stanza = 0; - end - - add_filter(session, "stanzas/out", outgoing_stanza_filter, -999); - - local session_close = session.close; - function session.close(...) - if session.resumption_token then - session_registry.set(session.username, session.resumption_token, nil); - old_session_registry.set(session.username, session.resumption_token, nil); - session.resumption_token = nil; - end - -- send out last ack as per revision 1.5.2 of XEP-0198 - if session.smacks and session.conn and session.handled_stanza_count then - (session.sends2s or session.send)(st.stanza("a", { xmlns = session.smacks, h = string.format("%d", session.handled_stanza_count) })); - end - return session_close(...); - end - return session; -end - -local function wrap_session_in(session, resume) - if not resume then - session.handled_stanza_count = 0; - end - add_filter(session, "stanzas/in", count_incoming_stanzas, 999); - - return session; -end - -local function wrap_session(session, resume) - wrap_session_out(session, resume); - wrap_session_in(session, resume); - return session; -end - -function handle_enable(session, stanza, xmlns_sm) - local ok, err, err_text = can_do_smacks(session); - if not ok then - session.log("warn", "Failed to enable smacks: %s", err_text); -- TODO: XEP doesn't say we can send error text, should it? - (session.sends2s or session.send)(st.stanza("failed", { xmlns = xmlns_sm }):tag(err, { xmlns = xmlns_errors})); - return true; - end - - module:log("debug", "Enabling stream management"); - session.smacks = xmlns_sm; - - wrap_session(session, false); - - local resume_token; - local resume = stanza.attr.resume; - if resume == "true" or resume == "1" then - resume_token = uuid_generate(); - session_registry.set(session.username, resume_token, session); - session.resumption_token = resume_token; - end - (session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume, max = tostring(resume_timeout) })); - return true; -end -module:hook_stanza(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100); -module:hook_stanza(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100); - -module:hook_stanza("http://etherx.jabber.org/streams", "features", - function (session, stanza) - stoppable_timer(1e-6, function () - if can_do_smacks(session) then - if stanza:get_child("sm", xmlns_sm3) then - session.sends2s(st.stanza("enable", sm3_attr)); - session.smacks = xmlns_sm3; - elseif stanza:get_child("sm", xmlns_sm2) then - session.sends2s(st.stanza("enable", sm2_attr)); - session.smacks = xmlns_sm2; - else - return; - end - wrap_session_out(session, false); - end - end); - end); - -function handle_enabled(session, stanza, xmlns_sm) - module:log("debug", "Enabling stream management"); - session.smacks = xmlns_sm; - - wrap_session_in(session, false); - - -- FIXME Resume? - - return true; -end -module:hook_stanza(xmlns_sm2, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm2); end, 100); -module:hook_stanza(xmlns_sm3, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm3); end, 100); - -function handle_r(origin, stanza, xmlns_sm) - if not origin.smacks then - module:log("debug", "Received ack request from non-smack-enabled session"); - return; - end - module:log("debug", "Received ack request, acking for %d", origin.handled_stanza_count); - -- Reply with <a> - (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = string.format("%d", origin.handled_stanza_count) })); - -- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h) - local expected_h = origin.last_acknowledged_stanza + #origin.outgoing_stanza_queue; - if #origin.outgoing_stanza_queue > 0 and expected_h ~= origin.last_requested_h then - request_ack_if_needed(origin, true, "piggybacked by handle_r", nil); - end - return true; -end -module:hook_stanza(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end); -module:hook_stanza(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end); - -function handle_a(origin, stanza) - if not origin.smacks then return; end - origin.awaiting_ack = nil; - if origin.awaiting_ack_timer then - origin.awaiting_ack_timer:stop(); - end - if origin.delayed_ack_timer then - origin.delayed_ack_timer:stop(); - origin.delayed_ack_timer = nil; - end - -- Remove handled stanzas from outgoing_stanza_queue - -- origin.log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or ""); - local h = tonumber(stanza.attr.h); - if not h then - origin:close{ condition = "invalid-xml"; text = "Missing or invalid 'h' attribute"; }; - return; - end - local handled_stanza_count = h-origin.last_acknowledged_stanza; - local queue = origin.outgoing_stanza_queue; - if handled_stanza_count > #queue then - origin.log("warn", "The client says it handled %d new stanzas, but we only sent %d :)", - handled_stanza_count, #queue); - origin.log("debug", "Client h: %d, our h: %d", tonumber(stanza.attr.h), origin.last_acknowledged_stanza); - for i=1,#queue do - origin.log("debug", "Q item %d: %s", i, tostring(queue[i])); - end - origin:close{ condition = "undefined-condition"; text = "Client acknowledged more stanzas than sent by server"; }; - return; - end - - for i=1,math_min(handled_stanza_count,#queue) do - local handled_stanza = t_remove(origin.outgoing_stanza_queue, 1); - module:fire_event("delivery/success", { session = origin, stanza = handled_stanza }); - end - - origin.log("debug", "#queue = %d", #queue); - origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count; - request_ack_if_needed(origin, false, "handle_a", nil) - return true; -end -module:hook_stanza(xmlns_sm2, "a", handle_a); -module:hook_stanza(xmlns_sm3, "a", handle_a); - ---TODO: Optimise... incoming stanzas should be handled by a per-session --- function that has a counter as an upvalue (no table indexing for increments, --- and won't slow non-198 sessions). We can also then remove the .handled flag --- on stanzas - -local function handle_unacked_stanzas(session) - local queue = session.outgoing_stanza_queue; - local error_attr = { type = "cancel" }; - if #queue > 0 then - session.outgoing_stanza_queue = {}; - for i=1,#queue do - if not module:fire_event("delivery/failure", { session = session, stanza = queue[i] }) then - if queue[i].attr.type ~= "error" then - local reply = st.reply(queue[i]); - if reply.attr.to ~= session.full_jid then - reply.attr.type = "error"; - reply:tag("error", error_attr) - :tag("recipient-unavailable", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}); - core_process_stanza(session, reply); - end - end - end - end - end -end - --- don't send delivery errors for messages which will be delivered by mam later on --- check if stanza was archived --> this will allow us to send back errors for stanzas not archived --- because the user configured the server to do so ("no-archive"-setting for one special contact for example) -local function get_stanza_id(stanza, by_jid) - for tag in stanza:childtags("stanza-id", "urn:xmpp:sid:0") do - if tag.attr.by == by_jid then - return tag.attr.id; - end - end - return nil; -end -module:hook("delivery/failure", function(event) - local session, stanza = event.session, event.stanza; - -- Only deal with authenticated (c2s) sessions - if session.username then - if stanza.name == "message" and stanza.attr.xmlns == nil and - ( stanza.attr.type == "chat" or ( stanza.attr.type or "normal" ) == "normal" ) then - -- don't store messages in offline store if they are mam results - local mam_result = stanza:get_child("result", xmlns_mam2); - if mam_result ~= nil then - return true; -- stanza already "handled", don't send an error and don't add it to offline storage - end - -- do nothing here for normal messages and don't send out "message delivery errors", - -- because messages are already in MAM at this point (no need to frighten users) - local stanza_id = get_stanza_id(stanza, jid.bare(session.full_jid)); - if session.mam_requested and stanza_id ~= nil then - session.log("debug", "mod_smacks delivery/failure returning true for mam-handled stanza: mam-archive-id=%s", tostring(stanza_id)); - return true; -- stanza handled, don't send an error - end - -- store message in offline store, if this client does not use mam *and* was the last client online - local sessions = prosody.hosts[module.host].sessions[session.username] and - prosody.hosts[module.host].sessions[session.username].sessions or nil; - if sessions and next(sessions) == session.resource and next(sessions, session.resource) == nil then - local ok = module:fire_event("message/offline/handle", { origin = session, username = session.username, stanza = stanza }); - session.log("debug", "mod_smacks delivery/failuere returning %s for offline-handled stanza", tostring(ok)); - return ok; -- if stanza was handled, don't send an error - end - end - end -end); - -module:hook("pre-resource-unbind", function (event) - local session, err = event.session, event.error; - if session.smacks then - if not session.resumption_token then - local queue = session.outgoing_stanza_queue; - if #queue > 0 then - session.log("debug", "Destroying session with %d unacked stanzas", #queue); - handle_unacked_stanzas(session); - end - else - session.log("debug", "mod_smacks hibernating session for up to %d seconds", resume_timeout); - local hibernate_time = os_time(); -- Track the time we went into hibernation - session.hibernating = hibernate_time; - local resumption_token = session.resumption_token; - module:fire_event("smacks-hibernation-start", {origin = session, queue = session.outgoing_stanza_queue}); - timer.add_task(resume_timeout, function () - session.log("debug", "mod_smacks hibernation timeout reached..."); - -- We need to check the current resumption token for this resource - -- matches the smacks session this timer is for in case it changed - -- (for example, the client may have bound a new resource and - -- started a new smacks session, or not be using smacks) - local curr_session = full_sessions[session.full_jid]; - if session.destroyed then - session.log("debug", "The session has already been destroyed"); - elseif curr_session and curr_session.resumption_token == resumption_token - -- Check the hibernate time still matches what we think it is, - -- otherwise the session resumed and re-hibernated. - and session.hibernating == hibernate_time then - -- wait longer if the timeout isn't reached because push was enabled for this session - -- session.first_hibernated_push is the starting point for hibernation timeouts of those push enabled clients - -- wait for an additional resume_timeout seconds if no push occurred since hibernation at all - local current_time = os_time(); - local timeout_start = math_max(session.hibernating, session.first_hibernated_push or session.hibernating); - if session.push_identifier ~= nil and not session.first_hibernated_push then - session.log("debug", "No push happened since hibernation started, hibernating session for up to %d extra seconds", resume_timeout); - return resume_timeout; - end - if session.push_identifier ~= nil and current_time-timeout_start < resume_timeout then - session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds", resume_timeout-(current_time-timeout_start)); - return resume_timeout-(current_time-timeout_start); -- time left to wait - end - session.log("debug", "Destroying session for hibernating too long"); - session_registry.set(session.username, session.resumption_token, nil); - -- save only actual h value and username/host (for security) - old_session_registry.set(session.username, session.resumption_token, { - h = session.handled_stanza_count, - username = session.username, - host = session.host - }); - session.resumption_token = nil; - sessionmanager.destroy_session(session); - else - session.log("debug", "Session resumed before hibernation timeout, all is well") - end - end); - return true; -- Postpone destruction for now - end - end -end); - -local function handle_s2s_destroyed(event) - local session = event.session; - local queue = session.outgoing_stanza_queue; - if queue and #queue > 0 then - session.log("warn", "Destroying session with %d unacked stanzas", #queue); - if s2s_resend then - for i = 1, #queue do - module:send(queue[i]); - end - session.outgoing_stanza_queue = nil; - else - handle_unacked_stanzas(session); - end - end -end - -module:hook("s2sout-destroyed", handle_s2s_destroyed); -module:hook("s2sin-destroyed", handle_s2s_destroyed); - -local function get_session_id(session) - return session.id or (tostring(session):match("[a-f0-9]+$")); -end - -function handle_resume(session, stanza, xmlns_sm) - if session.full_jid then - session.log("warn", "Tried to resume after resource binding"); - session.send(st.stanza("failed", { xmlns = xmlns_sm }) - :tag("unexpected-request", { xmlns = xmlns_errors }) - ); - return true; - end - - local id = stanza.attr.previd; - local original_session = session_registry.get(session.username, id); - if not original_session then - session.log("debug", "Tried to resume non-existent session with id %s", id); - local old_session = old_session_registry.get(session.username, id); - if old_session and session.username == old_session.username - and session.host == old_session.host - and old_session.h then - session.send(st.stanza("failed", { xmlns = xmlns_sm, h = string.format("%d", old_session.h) }) - :tag("item-not-found", { xmlns = xmlns_errors }) - ); - else - session.send(st.stanza("failed", { xmlns = xmlns_sm }) - :tag("item-not-found", { xmlns = xmlns_errors }) - ); - end; - elseif session.username == original_session.username - and session.host == original_session.host then - session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session)); - original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session)); - -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) - if original_session.conn then - original_session.log("debug", "mod_smacks closing an old connection for this session"); - local conn = original_session.conn; - c2s_sessions[conn] = nil; - conn:close(); - end - local migrated_session_log = session.log; - original_session.ip = session.ip; - original_session.conn = session.conn; - original_session.send = session.send; - original_session.close = session.close; - original_session.filter = session.filter; - original_session.filter.session = original_session; - original_session.filters = session.filters; - original_session.stream = session.stream; - original_session.secure = session.secure; - original_session.hibernating = nil; - session.log = original_session.log; - session.type = original_session.type; - wrap_session(original_session, true); - -- Inform xmppstream of the new session (passed to its callbacks) - original_session.stream:set_session(original_session); - -- Similar for connlisteners - c2s_sessions[session.conn] = original_session; - - original_session.send(st.stanza("resumed", { xmlns = xmlns_sm, - h = string.format("%d", original_session.handled_stanza_count), previd = id })); - - -- Fake an <a> with the h of the <resume/> from the client - original_session:dispatch_stanza(st.stanza("a", { xmlns = xmlns_sm, - h = stanza.attr.h })); - - -- Ok, we need to re-send any stanzas that the client didn't see - -- ...they are what is now left in the outgoing stanza queue - -- We have to use the send of "session" because we don't want to add our resent stanzas - -- to the outgoing queue again - local queue = original_session.outgoing_stanza_queue; - session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", #queue); - for i=1,#queue do - session.send(queue[i]); - end - session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", #queue); - function session.send(stanza) - migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); - return false; - end - module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue}); - request_ack_if_needed(original_session, true, "handle_resume", nil); - else - module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]", - session.username or "?", session.host or "?", session.type, - original_session.username or "?", original_session.host or "?", original_session.type); - session.send(st.stanza("failed", { xmlns = xmlns_sm }) - :tag("not-authorized", { xmlns = xmlns_errors })); - end - return true; -end -module:hook_stanza(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); -module:hook_stanza(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); - -module:hook("csi-client-active", function (event) - if event.origin.smacks then - request_ack_if_needed(event.origin, true, "csi-active", nil); - end -end); - -module:hook("csi-flushing", function(event) - local session = event.session; - if session.smacks then - if not session.awaiting_ack and not session.hibernating and not session.destroyed then - session.log("debug", "Sending <r> (csi-flushing)"); - session.awaiting_ack = true; -- The send() call may invoke this event again, so set this first - (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) - end - end -end); - -local function handle_read_timeout(event) - local session = event.session; - if session.smacks then - if session.awaiting_ack then - if session.awaiting_ack_timer then - session.awaiting_ack_timer:stop(); - end - if session.delayed_ack_timer then - session.delayed_ack_timer:stop(); - session.delayed_ack_timer = nil; - end - return false; -- Kick the session - end - session.log("debug", "Sending <r> (read timeout)"); - (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })); - session.awaiting_ack = true; - if not session.delayed_ack_timer then - session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() - delayed_ack_function(session, nil); - end); - end - return true; - end -end - -module:hook("s2s-read-timeout", handle_read_timeout); -module:hook("c2s-read-timeout", handle_read_timeout);