Software /
code /
prosody
Comparison
plugins/mod_smacks.lua @ 11934:65cdb1b21db3
mod_smacks: Import from prosody-modules @ eb63890ae8fc
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Tue, 16 Nov 2021 21:15:22 +0100 |
child | 11935:4d0d10fabb82 |
comparison
equal
deleted
inserted
replaced
11933:f752427a5214 | 11934:65cdb1b21db3 |
---|---|
1 -- XEP-0198: Stream Management for Prosody IM | |
2 -- | |
3 -- Copyright (C) 2010-2015 Matthew Wild | |
4 -- Copyright (C) 2010 Waqas Hussain | |
5 -- Copyright (C) 2012-2021 Kim Alvefur | |
6 -- Copyright (C) 2012 Thijs Alkemade | |
7 -- Copyright (C) 2014 Florian Zeitz | |
8 -- Copyright (C) 2016-2020 Thilo Molitor | |
9 -- | |
10 -- This project is MIT/X11 licensed. Please see the | |
11 -- COPYING file in the source package for more information. | |
12 -- | |
13 | |
14 local st = require "util.stanza"; | |
15 local dep = require "util.dependencies"; | |
16 local cache = dep.softreq("util.cache"); -- only available in prosody 0.10+ | |
17 local uuid_generate = require "util.uuid".generate; | |
18 local jid = require "util.jid"; | |
19 | |
20 local t_remove = table.remove; | |
21 local math_min = math.min; | |
22 local math_max = math.max; | |
23 local os_time = os.time; | |
24 local tonumber, tostring = tonumber, tostring; | |
25 local add_filter = require "util.filters".add_filter; | |
26 local timer = require "util.timer"; | |
27 local datetime = require "util.datetime"; | |
28 | |
29 local xmlns_mam2 = "urn:xmpp:mam:2"; | |
30 local xmlns_sm2 = "urn:xmpp:sm:2"; | |
31 local xmlns_sm3 = "urn:xmpp:sm:3"; | |
32 local xmlns_errors = "urn:ietf:params:xml:ns:xmpp-stanzas"; | |
33 local xmlns_delay = "urn:xmpp:delay"; | |
34 | |
35 local sm2_attr = { xmlns = xmlns_sm2 }; | |
36 local sm3_attr = { xmlns = xmlns_sm3 }; | |
37 | |
38 local resume_timeout = module:get_option_number("smacks_hibernation_time", 600); | |
39 local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", true); | |
40 local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false); | |
41 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0); | |
42 local max_inactive_unacked_stanzas = module:get_option_number("smacks_max_inactive_unacked_stanzas", 256); | |
43 local delayed_ack_timeout = module:get_option_number("smacks_max_ack_delay", 30); | |
44 local max_hibernated_sessions = module:get_option_number("smacks_max_hibernated_sessions", 10); | |
45 local max_old_sessions = module:get_option_number("smacks_max_old_sessions", 10); | |
46 local core_process_stanza = prosody.core_process_stanza; | |
47 local sessionmanager = require"core.sessionmanager"; | |
48 | |
49 assert(max_hibernated_sessions > 0, "smacks_max_hibernated_sessions must be greater than 0"); | |
50 assert(max_old_sessions > 0, "smacks_max_old_sessions must be greater than 0"); | |
51 | |
52 local c2s_sessions = module:shared("/*/c2s/sessions"); | |
53 | |
54 local function init_session_cache(max_entries, evict_callback) | |
55 -- old prosody version < 0.10 (no limiting at all!) | |
56 if not cache then | |
57 local store = {}; | |
58 return { | |
59 get = function(user, key) | |
60 if not user then return nil; end | |
61 if not key then return nil; end | |
62 return store[key]; | |
63 end; | |
64 set = function(user, key, value) | |
65 if not user then return nil; end | |
66 if not key then return nil; end | |
67 store[key] = value; | |
68 end; | |
69 }; | |
70 end | |
71 | |
72 -- use per user limited cache for prosody >= 0.10 | |
73 local stores = {}; | |
74 return { | |
75 get = function(user, key) | |
76 if not user then return nil; end | |
77 if not key then return nil; end | |
78 if not stores[user] then | |
79 stores[user] = cache.new(max_entries, evict_callback); | |
80 end | |
81 return stores[user]:get(key); | |
82 end; | |
83 set = function(user, key, value) | |
84 if not user then return nil; end | |
85 if not key then return nil; end | |
86 if not stores[user] then stores[user] = cache.new(max_entries, evict_callback); end | |
87 stores[user]:set(key, value); | |
88 -- remove empty caches completely | |
89 if not stores[user]:count() then stores[user] = nil; end | |
90 end; | |
91 }; | |
92 end | |
93 local old_session_registry = init_session_cache(max_old_sessions, nil); | |
94 local session_registry = init_session_cache(max_hibernated_sessions, function(resumption_token, session) | |
95 if session.destroyed then return true; end -- destroyed session can always be removed from cache | |
96 session.log("warn", "User has too much hibernated sessions, removing oldest session (token: %s)", resumption_token); | |
97 -- store old session's h values on force delete | |
98 -- save only actual h value and username/host (for security) | |
99 old_session_registry.set(session.username, resumption_token, { | |
100 h = session.handled_stanza_count, | |
101 username = session.username, | |
102 host = session.host | |
103 }); | |
104 return true; -- allow session to be removed from full cache to make room for new one | |
105 end); | |
106 | |
107 local function stoppable_timer(delay, callback) | |
108 local stopped = false; | |
109 local timer = module:add_timer(delay, function (t) | |
110 if stopped then return; end | |
111 return callback(t); | |
112 end); | |
113 if timer and timer.stop then return timer; end -- new prosody api includes stop() function | |
114 return { | |
115 stop = function(self) stopped = true end; | |
116 timer; | |
117 }; | |
118 end | |
119 | |
120 local function delayed_ack_function(session, stanza) | |
121 -- fire event only if configured to do so and our session is not already hibernated or destroyed | |
122 if delayed_ack_timeout > 0 and session.awaiting_ack | |
123 and not session.hibernating and not session.destroyed then | |
124 session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d", | |
125 session.outgoing_stanza_queue and #session.outgoing_stanza_queue or 0); | |
126 module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue, stanza = stanza}); | |
127 end | |
128 session.delayed_ack_timer = nil; | |
129 end | |
130 | |
131 local function can_do_smacks(session, advertise_only) | |
132 if session.smacks then return false, "unexpected-request", "Stream management is already enabled"; end | |
133 | |
134 local session_type = session.type; | |
135 if session.username then | |
136 if not(advertise_only) and not(session.resource) then -- Fail unless we're only advertising sm | |
137 return false, "unexpected-request", "Client must bind a resource before enabling stream management"; | |
138 end | |
139 return true; | |
140 elseif s2s_smacks and (session_type == "s2sin" or session_type == "s2sout") then | |
141 return true; | |
142 end | |
143 return false, "service-unavailable", "Stream management is not available for this stream"; | |
144 end | |
145 | |
146 module:hook("stream-features", | |
147 function (event) | |
148 if can_do_smacks(event.origin, true) then | |
149 event.features:tag("sm", sm2_attr):tag("optional"):up():up(); | |
150 event.features:tag("sm", sm3_attr):tag("optional"):up():up(); | |
151 end | |
152 end); | |
153 | |
154 module:hook("s2s-stream-features", | |
155 function (event) | |
156 if can_do_smacks(event.origin, true) then | |
157 event.features:tag("sm", sm2_attr):tag("optional"):up():up(); | |
158 event.features:tag("sm", sm3_attr):tag("optional"):up():up(); | |
159 end | |
160 end); | |
161 | |
162 local function request_ack_if_needed(session, force, reason, stanza) | |
163 local queue = session.outgoing_stanza_queue; | |
164 local expected_h = session.last_acknowledged_stanza + #queue; | |
165 -- session.log("debug", "*** SMACKS(1) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating)); | |
166 local max_unacked = max_unacked_stanzas; | |
167 if session.state == "inactive" then | |
168 max_unacked = max_inactive_unacked_stanzas; | |
169 end | |
170 if session.awaiting_ack == nil and not session.hibernating then | |
171 -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong | |
172 -- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any | |
173 -- further requests until a higher h-value would be expected. | |
174 -- session.log("debug", "*** SMACKS(2) ***: #queue=%s, max_unacked_stanzas=%s, expected_h=%s, last_requested_h=%s", tostring(#queue), tostring(max_unacked_stanzas), tostring(expected_h), tostring(session.last_requested_h)); | |
175 if (#queue > max_unacked and expected_h ~= session.last_requested_h) or force then | |
176 session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue); | |
177 session.awaiting_ack = false; | |
178 session.awaiting_ack_timer = stoppable_timer(1e-06, function () | |
179 -- session.log("debug", "*** SMACKS(3) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating)); | |
180 -- only request ack if needed and our session is not already hibernated or destroyed | |
181 if not session.awaiting_ack and not session.hibernating and not session.destroyed then | |
182 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue); | |
183 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) | |
184 if session.destroyed then return end -- sending something can trigger destruction | |
185 session.awaiting_ack = true; | |
186 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) | |
187 session.last_requested_h = session.last_acknowledged_stanza + #queue; | |
188 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue); | |
189 if not session.delayed_ack_timer then | |
190 session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() | |
191 delayed_ack_function(session, nil); -- we don't know if this is the only new stanza in the queue | |
192 end); | |
193 end | |
194 end | |
195 end); | |
196 end | |
197 end | |
198 | |
199 -- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue | |
200 -- and there isn't already a timer for this event running. | |
201 -- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event | |
202 -- would not trigger this event (again). | |
203 if #queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then | |
204 session.log("debug", "Calling delayed_ack_function directly (still waiting for ack)"); | |
205 delayed_ack_function(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules | |
206 end | |
207 end | |
208 | |
209 local function outgoing_stanza_filter(stanza, session) | |
210 -- XXX: Normally you wouldn't have to check the xmlns for a stanza as it's | |
211 -- supposed to be nil. | |
212 -- However, when using mod_smacks with mod_websocket, then mod_websocket's | |
213 -- stanzas/out filter can get called before this one and adds the xmlns. | |
214 local is_stanza = stanza.attr and | |
215 (not stanza.attr.xmlns or stanza.attr.xmlns == 'jabber:client') | |
216 and not stanza.name:find":"; | |
217 | |
218 if is_stanza and not stanza._cached then | |
219 local queue = session.outgoing_stanza_queue; | |
220 local cached_stanza = st.clone(stanza); | |
221 cached_stanza._cached = true; | |
222 | |
223 if cached_stanza and cached_stanza.name ~= "iq" and cached_stanza:get_child("delay", xmlns_delay) == nil then | |
224 cached_stanza = cached_stanza:tag("delay", { | |
225 xmlns = xmlns_delay, | |
226 from = jid.bare(session.full_jid or session.host), | |
227 stamp = datetime.datetime() | |
228 }); | |
229 end | |
230 | |
231 queue[#queue+1] = cached_stanza; | |
232 if session.hibernating then | |
233 session.log("debug", "hibernating since %s, stanza queued", datetime.datetime(session.hibernating)); | |
234 module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue, stanza = cached_stanza}); | |
235 return nil; | |
236 end | |
237 request_ack_if_needed(session, false, "outgoing_stanza_filter", stanza); | |
238 end | |
239 return stanza; | |
240 end | |
241 | |
242 local function count_incoming_stanzas(stanza, session) | |
243 if not stanza.attr.xmlns then | |
244 session.handled_stanza_count = session.handled_stanza_count + 1; | |
245 session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count); | |
246 end | |
247 return stanza; | |
248 end | |
249 | |
250 local function wrap_session_out(session, resume) | |
251 if not resume then | |
252 session.outgoing_stanza_queue = {}; | |
253 session.last_acknowledged_stanza = 0; | |
254 end | |
255 | |
256 add_filter(session, "stanzas/out", outgoing_stanza_filter, -999); | |
257 | |
258 local session_close = session.close; | |
259 function session.close(...) | |
260 if session.resumption_token then | |
261 session_registry.set(session.username, session.resumption_token, nil); | |
262 old_session_registry.set(session.username, session.resumption_token, nil); | |
263 session.resumption_token = nil; | |
264 end | |
265 -- send out last ack as per revision 1.5.2 of XEP-0198 | |
266 if session.smacks and session.conn and session.handled_stanza_count then | |
267 (session.sends2s or session.send)(st.stanza("a", { xmlns = session.smacks, h = string.format("%d", session.handled_stanza_count) })); | |
268 end | |
269 return session_close(...); | |
270 end | |
271 return session; | |
272 end | |
273 | |
274 local function wrap_session_in(session, resume) | |
275 if not resume then | |
276 session.handled_stanza_count = 0; | |
277 end | |
278 add_filter(session, "stanzas/in", count_incoming_stanzas, 999); | |
279 | |
280 return session; | |
281 end | |
282 | |
283 local function wrap_session(session, resume) | |
284 wrap_session_out(session, resume); | |
285 wrap_session_in(session, resume); | |
286 return session; | |
287 end | |
288 | |
289 function handle_enable(session, stanza, xmlns_sm) | |
290 local ok, err, err_text = can_do_smacks(session); | |
291 if not ok then | |
292 session.log("warn", "Failed to enable smacks: %s", err_text); -- TODO: XEP doesn't say we can send error text, should it? | |
293 (session.sends2s or session.send)(st.stanza("failed", { xmlns = xmlns_sm }):tag(err, { xmlns = xmlns_errors})); | |
294 return true; | |
295 end | |
296 | |
297 module:log("debug", "Enabling stream management"); | |
298 session.smacks = xmlns_sm; | |
299 | |
300 wrap_session(session, false); | |
301 | |
302 local resume_token; | |
303 local resume = stanza.attr.resume; | |
304 if resume == "true" or resume == "1" then | |
305 resume_token = uuid_generate(); | |
306 session_registry.set(session.username, resume_token, session); | |
307 session.resumption_token = resume_token; | |
308 end | |
309 (session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume, max = tostring(resume_timeout) })); | |
310 return true; | |
311 end | |
312 module:hook_stanza(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100); | |
313 module:hook_stanza(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100); | |
314 | |
315 module:hook_stanza("http://etherx.jabber.org/streams", "features", | |
316 function (session, stanza) | |
317 stoppable_timer(1e-6, function () | |
318 if can_do_smacks(session) then | |
319 if stanza:get_child("sm", xmlns_sm3) then | |
320 session.sends2s(st.stanza("enable", sm3_attr)); | |
321 session.smacks = xmlns_sm3; | |
322 elseif stanza:get_child("sm", xmlns_sm2) then | |
323 session.sends2s(st.stanza("enable", sm2_attr)); | |
324 session.smacks = xmlns_sm2; | |
325 else | |
326 return; | |
327 end | |
328 wrap_session_out(session, false); | |
329 end | |
330 end); | |
331 end); | |
332 | |
333 function handle_enabled(session, stanza, xmlns_sm) | |
334 module:log("debug", "Enabling stream management"); | |
335 session.smacks = xmlns_sm; | |
336 | |
337 wrap_session_in(session, false); | |
338 | |
339 -- FIXME Resume? | |
340 | |
341 return true; | |
342 end | |
343 module:hook_stanza(xmlns_sm2, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm2); end, 100); | |
344 module:hook_stanza(xmlns_sm3, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm3); end, 100); | |
345 | |
346 function handle_r(origin, stanza, xmlns_sm) | |
347 if not origin.smacks then | |
348 module:log("debug", "Received ack request from non-smack-enabled session"); | |
349 return; | |
350 end | |
351 module:log("debug", "Received ack request, acking for %d", origin.handled_stanza_count); | |
352 -- Reply with <a> | |
353 (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = string.format("%d", origin.handled_stanza_count) })); | |
354 -- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h) | |
355 local expected_h = origin.last_acknowledged_stanza + #origin.outgoing_stanza_queue; | |
356 if #origin.outgoing_stanza_queue > 0 and expected_h ~= origin.last_requested_h then | |
357 request_ack_if_needed(origin, true, "piggybacked by handle_r", nil); | |
358 end | |
359 return true; | |
360 end | |
361 module:hook_stanza(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end); | |
362 module:hook_stanza(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end); | |
363 | |
364 function handle_a(origin, stanza) | |
365 if not origin.smacks then return; end | |
366 origin.awaiting_ack = nil; | |
367 if origin.awaiting_ack_timer then | |
368 origin.awaiting_ack_timer:stop(); | |
369 end | |
370 if origin.delayed_ack_timer then | |
371 origin.delayed_ack_timer:stop(); | |
372 origin.delayed_ack_timer = nil; | |
373 end | |
374 -- Remove handled stanzas from outgoing_stanza_queue | |
375 -- origin.log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or ""); | |
376 local h = tonumber(stanza.attr.h); | |
377 if not h then | |
378 origin:close{ condition = "invalid-xml"; text = "Missing or invalid 'h' attribute"; }; | |
379 return; | |
380 end | |
381 local handled_stanza_count = h-origin.last_acknowledged_stanza; | |
382 local queue = origin.outgoing_stanza_queue; | |
383 if handled_stanza_count > #queue then | |
384 origin.log("warn", "The client says it handled %d new stanzas, but we only sent %d :)", | |
385 handled_stanza_count, #queue); | |
386 origin.log("debug", "Client h: %d, our h: %d", tonumber(stanza.attr.h), origin.last_acknowledged_stanza); | |
387 for i=1,#queue do | |
388 origin.log("debug", "Q item %d: %s", i, tostring(queue[i])); | |
389 end | |
390 origin:close{ condition = "undefined-condition"; text = "Client acknowledged more stanzas than sent by server"; }; | |
391 return; | |
392 end | |
393 | |
394 for i=1,math_min(handled_stanza_count,#queue) do | |
395 local handled_stanza = t_remove(origin.outgoing_stanza_queue, 1); | |
396 module:fire_event("delivery/success", { session = origin, stanza = handled_stanza }); | |
397 end | |
398 | |
399 origin.log("debug", "#queue = %d", #queue); | |
400 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count; | |
401 request_ack_if_needed(origin, false, "handle_a", nil) | |
402 return true; | |
403 end | |
404 module:hook_stanza(xmlns_sm2, "a", handle_a); | |
405 module:hook_stanza(xmlns_sm3, "a", handle_a); | |
406 | |
407 --TODO: Optimise... incoming stanzas should be handled by a per-session | |
408 -- function that has a counter as an upvalue (no table indexing for increments, | |
409 -- and won't slow non-198 sessions). We can also then remove the .handled flag | |
410 -- on stanzas | |
411 | |
412 local function handle_unacked_stanzas(session) | |
413 local queue = session.outgoing_stanza_queue; | |
414 local error_attr = { type = "cancel" }; | |
415 if #queue > 0 then | |
416 session.outgoing_stanza_queue = {}; | |
417 for i=1,#queue do | |
418 if not module:fire_event("delivery/failure", { session = session, stanza = queue[i] }) then | |
419 if queue[i].attr.type ~= "error" then | |
420 local reply = st.reply(queue[i]); | |
421 if reply.attr.to ~= session.full_jid then | |
422 reply.attr.type = "error"; | |
423 reply:tag("error", error_attr) | |
424 :tag("recipient-unavailable", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}); | |
425 core_process_stanza(session, reply); | |
426 end | |
427 end | |
428 end | |
429 end | |
430 end | |
431 end | |
432 | |
433 -- don't send delivery errors for messages which will be delivered by mam later on | |
434 -- check if stanza was archived --> this will allow us to send back errors for stanzas not archived | |
435 -- because the user configured the server to do so ("no-archive"-setting for one special contact for example) | |
436 local function get_stanza_id(stanza, by_jid) | |
437 for tag in stanza:childtags("stanza-id", "urn:xmpp:sid:0") do | |
438 if tag.attr.by == by_jid then | |
439 return tag.attr.id; | |
440 end | |
441 end | |
442 return nil; | |
443 end | |
444 module:hook("delivery/failure", function(event) | |
445 local session, stanza = event.session, event.stanza; | |
446 -- Only deal with authenticated (c2s) sessions | |
447 if session.username then | |
448 if stanza.name == "message" and stanza.attr.xmlns == nil and | |
449 ( stanza.attr.type == "chat" or ( stanza.attr.type or "normal" ) == "normal" ) then | |
450 -- don't store messages in offline store if they are mam results | |
451 local mam_result = stanza:get_child("result", xmlns_mam2); | |
452 if mam_result ~= nil then | |
453 return true; -- stanza already "handled", don't send an error and don't add it to offline storage | |
454 end | |
455 -- do nothing here for normal messages and don't send out "message delivery errors", | |
456 -- because messages are already in MAM at this point (no need to frighten users) | |
457 local stanza_id = get_stanza_id(stanza, jid.bare(session.full_jid)); | |
458 if session.mam_requested and stanza_id ~= nil then | |
459 session.log("debug", "mod_smacks delivery/failure returning true for mam-handled stanza: mam-archive-id=%s", tostring(stanza_id)); | |
460 return true; -- stanza handled, don't send an error | |
461 end | |
462 -- store message in offline store, if this client does not use mam *and* was the last client online | |
463 local sessions = prosody.hosts[module.host].sessions[session.username] and | |
464 prosody.hosts[module.host].sessions[session.username].sessions or nil; | |
465 if sessions and next(sessions) == session.resource and next(sessions, session.resource) == nil then | |
466 local ok = module:fire_event("message/offline/handle", { origin = session, username = session.username, stanza = stanza }); | |
467 session.log("debug", "mod_smacks delivery/failuere returning %s for offline-handled stanza", tostring(ok)); | |
468 return ok; -- if stanza was handled, don't send an error | |
469 end | |
470 end | |
471 end | |
472 end); | |
473 | |
474 module:hook("pre-resource-unbind", function (event) | |
475 local session, err = event.session, event.error; | |
476 if session.smacks then | |
477 if not session.resumption_token then | |
478 local queue = session.outgoing_stanza_queue; | |
479 if #queue > 0 then | |
480 session.log("debug", "Destroying session with %d unacked stanzas", #queue); | |
481 handle_unacked_stanzas(session); | |
482 end | |
483 else | |
484 session.log("debug", "mod_smacks hibernating session for up to %d seconds", resume_timeout); | |
485 local hibernate_time = os_time(); -- Track the time we went into hibernation | |
486 session.hibernating = hibernate_time; | |
487 local resumption_token = session.resumption_token; | |
488 module:fire_event("smacks-hibernation-start", {origin = session, queue = session.outgoing_stanza_queue}); | |
489 timer.add_task(resume_timeout, function () | |
490 session.log("debug", "mod_smacks hibernation timeout reached..."); | |
491 -- We need to check the current resumption token for this resource | |
492 -- matches the smacks session this timer is for in case it changed | |
493 -- (for example, the client may have bound a new resource and | |
494 -- started a new smacks session, or not be using smacks) | |
495 local curr_session = full_sessions[session.full_jid]; | |
496 if session.destroyed then | |
497 session.log("debug", "The session has already been destroyed"); | |
498 elseif curr_session and curr_session.resumption_token == resumption_token | |
499 -- Check the hibernate time still matches what we think it is, | |
500 -- otherwise the session resumed and re-hibernated. | |
501 and session.hibernating == hibernate_time then | |
502 -- wait longer if the timeout isn't reached because push was enabled for this session | |
503 -- session.first_hibernated_push is the starting point for hibernation timeouts of those push enabled clients | |
504 -- wait for an additional resume_timeout seconds if no push occurred since hibernation at all | |
505 local current_time = os_time(); | |
506 local timeout_start = math_max(session.hibernating, session.first_hibernated_push or session.hibernating); | |
507 if session.push_identifier ~= nil and not session.first_hibernated_push then | |
508 session.log("debug", "No push happened since hibernation started, hibernating session for up to %d extra seconds", resume_timeout); | |
509 return resume_timeout; | |
510 end | |
511 if session.push_identifier ~= nil and current_time-timeout_start < resume_timeout then | |
512 session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds", resume_timeout-(current_time-timeout_start)); | |
513 return resume_timeout-(current_time-timeout_start); -- time left to wait | |
514 end | |
515 session.log("debug", "Destroying session for hibernating too long"); | |
516 session_registry.set(session.username, session.resumption_token, nil); | |
517 -- save only actual h value and username/host (for security) | |
518 old_session_registry.set(session.username, session.resumption_token, { | |
519 h = session.handled_stanza_count, | |
520 username = session.username, | |
521 host = session.host | |
522 }); | |
523 session.resumption_token = nil; | |
524 sessionmanager.destroy_session(session); | |
525 else | |
526 session.log("debug", "Session resumed before hibernation timeout, all is well") | |
527 end | |
528 end); | |
529 return true; -- Postpone destruction for now | |
530 end | |
531 end | |
532 end); | |
533 | |
534 local function handle_s2s_destroyed(event) | |
535 local session = event.session; | |
536 local queue = session.outgoing_stanza_queue; | |
537 if queue and #queue > 0 then | |
538 session.log("warn", "Destroying session with %d unacked stanzas", #queue); | |
539 if s2s_resend then | |
540 for i = 1, #queue do | |
541 module:send(queue[i]); | |
542 end | |
543 session.outgoing_stanza_queue = nil; | |
544 else | |
545 handle_unacked_stanzas(session); | |
546 end | |
547 end | |
548 end | |
549 | |
550 module:hook("s2sout-destroyed", handle_s2s_destroyed); | |
551 module:hook("s2sin-destroyed", handle_s2s_destroyed); | |
552 | |
553 local function get_session_id(session) | |
554 return session.id or (tostring(session):match("[a-f0-9]+$")); | |
555 end | |
556 | |
557 function handle_resume(session, stanza, xmlns_sm) | |
558 if session.full_jid then | |
559 session.log("warn", "Tried to resume after resource binding"); | |
560 session.send(st.stanza("failed", { xmlns = xmlns_sm }) | |
561 :tag("unexpected-request", { xmlns = xmlns_errors }) | |
562 ); | |
563 return true; | |
564 end | |
565 | |
566 local id = stanza.attr.previd; | |
567 local original_session = session_registry.get(session.username, id); | |
568 if not original_session then | |
569 session.log("debug", "Tried to resume non-existent session with id %s", id); | |
570 local old_session = old_session_registry.get(session.username, id); | |
571 if old_session and session.username == old_session.username | |
572 and session.host == old_session.host | |
573 and old_session.h then | |
574 session.send(st.stanza("failed", { xmlns = xmlns_sm, h = string.format("%d", old_session.h) }) | |
575 :tag("item-not-found", { xmlns = xmlns_errors }) | |
576 ); | |
577 else | |
578 session.send(st.stanza("failed", { xmlns = xmlns_sm }) | |
579 :tag("item-not-found", { xmlns = xmlns_errors }) | |
580 ); | |
581 end; | |
582 elseif session.username == original_session.username | |
583 and session.host == original_session.host then | |
584 session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session)); | |
585 original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session)); | |
586 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) | |
587 if original_session.conn then | |
588 original_session.log("debug", "mod_smacks closing an old connection for this session"); | |
589 local conn = original_session.conn; | |
590 c2s_sessions[conn] = nil; | |
591 conn:close(); | |
592 end | |
593 local migrated_session_log = session.log; | |
594 original_session.ip = session.ip; | |
595 original_session.conn = session.conn; | |
596 original_session.send = session.send; | |
597 original_session.close = session.close; | |
598 original_session.filter = session.filter; | |
599 original_session.filter.session = original_session; | |
600 original_session.filters = session.filters; | |
601 original_session.stream = session.stream; | |
602 original_session.secure = session.secure; | |
603 original_session.hibernating = nil; | |
604 session.log = original_session.log; | |
605 session.type = original_session.type; | |
606 wrap_session(original_session, true); | |
607 -- Inform xmppstream of the new session (passed to its callbacks) | |
608 original_session.stream:set_session(original_session); | |
609 -- Similar for connlisteners | |
610 c2s_sessions[session.conn] = original_session; | |
611 | |
612 original_session.send(st.stanza("resumed", { xmlns = xmlns_sm, | |
613 h = string.format("%d", original_session.handled_stanza_count), previd = id })); | |
614 | |
615 -- Fake an <a> with the h of the <resume/> from the client | |
616 original_session:dispatch_stanza(st.stanza("a", { xmlns = xmlns_sm, | |
617 h = stanza.attr.h })); | |
618 | |
619 -- Ok, we need to re-send any stanzas that the client didn't see | |
620 -- ...they are what is now left in the outgoing stanza queue | |
621 -- We have to use the send of "session" because we don't want to add our resent stanzas | |
622 -- to the outgoing queue again | |
623 local queue = original_session.outgoing_stanza_queue; | |
624 session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", #queue); | |
625 for i=1,#queue do | |
626 session.send(queue[i]); | |
627 end | |
628 session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", #queue); | |
629 function session.send(stanza) | |
630 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); | |
631 return false; | |
632 end | |
633 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue}); | |
634 request_ack_if_needed(original_session, true, "handle_resume", nil); | |
635 else | |
636 module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]", | |
637 session.username or "?", session.host or "?", session.type, | |
638 original_session.username or "?", original_session.host or "?", original_session.type); | |
639 session.send(st.stanza("failed", { xmlns = xmlns_sm }) | |
640 :tag("not-authorized", { xmlns = xmlns_errors })); | |
641 end | |
642 return true; | |
643 end | |
644 module:hook_stanza(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); | |
645 module:hook_stanza(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); | |
646 | |
647 module:hook("csi-client-active", function (event) | |
648 if event.origin.smacks then | |
649 request_ack_if_needed(event.origin, true, "csi-active", nil); | |
650 end | |
651 end); | |
652 | |
653 module:hook("csi-flushing", function(event) | |
654 local session = event.session; | |
655 if session.smacks then | |
656 if not session.awaiting_ack and not session.hibernating and not session.destroyed then | |
657 session.log("debug", "Sending <r> (csi-flushing)"); | |
658 session.awaiting_ack = true; -- The send() call may invoke this event again, so set this first | |
659 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) | |
660 end | |
661 end | |
662 end); | |
663 | |
664 local function handle_read_timeout(event) | |
665 local session = event.session; | |
666 if session.smacks then | |
667 if session.awaiting_ack then | |
668 if session.awaiting_ack_timer then | |
669 session.awaiting_ack_timer:stop(); | |
670 end | |
671 if session.delayed_ack_timer then | |
672 session.delayed_ack_timer:stop(); | |
673 session.delayed_ack_timer = nil; | |
674 end | |
675 return false; -- Kick the session | |
676 end | |
677 session.log("debug", "Sending <r> (read timeout)"); | |
678 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })); | |
679 session.awaiting_ack = true; | |
680 if not session.delayed_ack_timer then | |
681 session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() | |
682 delayed_ack_function(session, nil); | |
683 end); | |
684 end | |
685 return true; | |
686 end | |
687 end | |
688 | |
689 module:hook("s2s-read-timeout", handle_read_timeout); | |
690 module:hook("c2s-read-timeout", handle_read_timeout); |