Software /
code /
prosody-modules
Comparison
mod_smacks/mod_smacks.lua @ 2596:ffb6646b4253
Implement XEP-0198 revision 1.5.2 and limit number of hibernated sessions per user
Revision 1.5.2 allows sending h-values on resumes that fail due to hibernation timeout
and to send out a smacks ack directly before the stream close tag.
I also made the used timers stoppable even for prosody 0.10 and below, this makes
the smacks-ack-delayed event more useful.
author | tmolitor <thilo@eightysoft.de> |
---|---|
date | Sun, 05 Mar 2017 20:23:53 +0100 |
parent | 2494:d300ae5dba87 |
child | 2608:362ca94192ee |
comparison
equal
deleted
inserted
replaced
2595:307ddebb72e1 | 2596:ffb6646b4253 |
---|---|
10 -- This project is MIT/X11 licensed. Please see the | 10 -- This project is MIT/X11 licensed. Please see the |
11 -- COPYING file in the source package for more information. | 11 -- COPYING file in the source package for more information. |
12 -- | 12 -- |
13 | 13 |
14 local st = require "util.stanza"; | 14 local st = require "util.stanza"; |
15 local dep = require "util.dependencies"; | |
16 local cache = dep.softreq("util.cache"); -- only available in prosody 0.10+ | |
15 local uuid_generate = require "util.uuid".generate; | 17 local uuid_generate = require "util.uuid".generate; |
16 | 18 |
17 local t_insert, t_remove = table.insert, table.remove; | 19 local t_insert, t_remove = table.insert, table.remove; |
18 local math_min = math.min; | 20 local math_min = math.min; |
19 local os_time = os.time; | 21 local os_time = os.time; |
33 local resume_timeout = module:get_option_number("smacks_hibernation_time", 300); | 35 local resume_timeout = module:get_option_number("smacks_hibernation_time", 300); |
34 local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", false); | 36 local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", false); |
35 local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false); | 37 local s2s_resend = module:get_option_boolean("smacks_s2s_resend", false); |
36 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0); | 38 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0); |
37 local delayed_ack_timeout = module:get_option_number("smacks_max_ack_delay", 60); | 39 local delayed_ack_timeout = module:get_option_number("smacks_max_ack_delay", 60); |
40 local max_hibernated_sessions = module:get_option_number("smacks_max_hibernated_sessions", 10); | |
41 local max_old_sessions = module:get_option_number("smacks_max_old_sessions", 10); | |
38 local core_process_stanza = prosody.core_process_stanza; | 42 local core_process_stanza = prosody.core_process_stanza; |
39 local sessionmanager = require"core.sessionmanager"; | 43 local sessionmanager = require"core.sessionmanager"; |
40 | 44 |
41 local c2s_sessions = module:shared("/*/c2s/sessions"); | 45 local c2s_sessions = module:shared("/*/c2s/sessions"); |
42 local session_registry = {}; | 46 |
47 local function init_session_cache(max_entries, evict_callback) | |
48 -- old prosody version < 0.10 (no limiting at all!) | |
49 if not cache then | |
50 local store = {}; | |
51 return { | |
52 get = function(user, key) return store[user.."@"..key]; end; | |
53 set = function(user, key, value) store[user.."@"..key] = value; end; | |
54 }; | |
55 end | |
56 | |
57 -- use per user limited cache for prosody >= 0.10 | |
58 local stores = {}; | |
59 return { | |
60 get = function(user, key) | |
61 if not stores[user] then | |
62 stores[user] = cache.new(max_entries, evict_callback); | |
63 end | |
64 return stores[user]:get(key); | |
65 end; | |
66 set = function(user, key, value) | |
67 if not stores[user] then stores[user] = cache.new(max_entries, evict_callback); end | |
68 stores[user]:set(key, value); | |
69 -- remove empty caches completely | |
70 if not stores[user]:count() then stores[user] = nil; end | |
71 end; | |
72 }; | |
73 end | |
74 local old_session_registry = init_session_cache(max_old_sessions, nil); | |
75 local session_registry = init_session_cache(max_hibernated_sessions, function(resumption_token, session) | |
76 if session.destroyed then return; end | |
77 session.log("warn", "User has too much hibernated sessions, removing oldest session (token: %s)", resumption_token); | |
78 -- store old session's h values on force delete | |
79 -- save only actual h value and username/host (for security) | |
80 old_session_registry.set(session.username, resumption_token, { | |
81 h = session.handled_stanza_count, | |
82 username = session.username, | |
83 host = session.host | |
84 }); | |
85 return true; -- allow session to be removed from full cache to make room for new one | |
86 end); | |
87 | |
88 local function stoppable_timer(delay, callback) | |
89 local stopped = false; | |
90 return { | |
91 stop = function () stopped = true end; | |
92 module:add_timer(delay, function (t) | |
93 if stopped then return; end | |
94 return callback(t); | |
95 end); | |
96 }; | |
97 end | |
43 | 98 |
44 local function delayed_ack_function(session) | 99 local function delayed_ack_function(session) |
45 -- fire event only when configured to do so | 100 -- fire event only if configured to do so and our session is not hibernated or destroyed |
46 if delayed_ack_timeout > 0 and session.awaiting_ack and not (session.outgoing_stanza_queue == nil) then | 101 if delayed_ack_timeout > 0 and session.awaiting_ack |
47 session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d", #session.outgoing_stanza_queue); | 102 and not session.hibernating and not session.destroyed then |
103 session.log("debug", "Firing event 'smacks-ack-delayed', queue = %d", | |
104 session.outgoing_stanza_queue and #session.outgoing_stanza_queue or 0); | |
48 module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue}); | 105 module:fire_event("smacks-ack-delayed", {origin = session, queue = session.outgoing_stanza_queue}); |
49 end | 106 end |
50 session.delayed_ack_timer = nil; | 107 session.delayed_ack_timer = nil; |
51 end | 108 end |
52 | 109 |
84 local function request_ack_if_needed(session) | 141 local function request_ack_if_needed(session) |
85 local queue = session.outgoing_stanza_queue; | 142 local queue = session.outgoing_stanza_queue; |
86 if #queue > max_unacked_stanzas and session.awaiting_ack == nil then | 143 if #queue > max_unacked_stanzas and session.awaiting_ack == nil then |
87 session.log("debug", "Queuing <r> (in a moment)"); | 144 session.log("debug", "Queuing <r> (in a moment)"); |
88 session.awaiting_ack = false; | 145 session.awaiting_ack = false; |
89 session.awaiting_ack_timer = module:add_timer(1e-06, function () | 146 session.awaiting_ack_timer = stoppable_timer(1e-06, function () |
90 if not session.awaiting_ack then | 147 if not session.awaiting_ack then |
91 session.log("debug", "Sending <r> (inside timer, before send)"); | 148 session.log("debug", "Sending <r> (inside timer, before send)"); |
92 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) | 149 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) |
93 session.log("debug", "Sending <r> (inside timer, after send)"); | 150 session.log("debug", "Sending <r> (inside timer, after send)"); |
94 session.awaiting_ack = true; | 151 session.awaiting_ack = true; |
95 session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function() | 152 if not session.delayed_ack_timer then |
96 delayed_ack_function(session); | 153 session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() |
97 end); | 154 delayed_ack_function(session); |
155 end); | |
156 end | |
98 end | 157 end |
99 end); | 158 end); |
100 end | 159 end |
101 -- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue | 160 -- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue |
102 -- and there isn't already a timer for this event running. | 161 -- and there isn't already a timer for this event running. |
147 add_filter(session, "stanzas/out", outgoing_stanza_filter, -999); | 206 add_filter(session, "stanzas/out", outgoing_stanza_filter, -999); |
148 | 207 |
149 local session_close = session.close; | 208 local session_close = session.close; |
150 function session.close(...) | 209 function session.close(...) |
151 if session.resumption_token then | 210 if session.resumption_token then |
152 session_registry[session.resumption_token] = nil; | 211 session_registry.set(session.username, session.resumption_token, nil); |
212 old_session_registry.set(session.username, session.resumption_token, nil); | |
153 session.resumption_token = nil; | 213 session.resumption_token = nil; |
214 end | |
215 -- send out last ack as per revision 1.5.2 of XEP-0198 | |
216 if session.smacks then | |
217 (session.sends2s or session.send)(st.stanza("a", { xmlns = session.smacks, h = tostring(session.handled_stanza_count) })); | |
154 end | 218 end |
155 return session_close(...); | 219 return session_close(...); |
156 end | 220 end |
157 return session; | 221 return session; |
158 end | 222 end |
187 | 251 |
188 local resume_token; | 252 local resume_token; |
189 local resume = stanza.attr.resume; | 253 local resume = stanza.attr.resume; |
190 if resume == "true" or resume == "1" then | 254 if resume == "true" or resume == "1" then |
191 resume_token = uuid_generate(); | 255 resume_token = uuid_generate(); |
192 session_registry[resume_token] = session; | 256 session_registry.set(session.username, resume_token, session); |
193 session.resumption_token = resume_token; | 257 session.resumption_token = resume_token; |
194 end | 258 end |
195 (session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume })); | 259 (session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume })); |
196 return true; | 260 return true; |
197 end | 261 end |
198 module:hook_stanza(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100); | 262 module:hook_stanza(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100); |
199 module:hook_stanza(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100); | 263 module:hook_stanza(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100); |
200 | 264 |
201 module:hook_stanza("http://etherx.jabber.org/streams", "features", | 265 module:hook_stanza("http://etherx.jabber.org/streams", "features", |
202 function (session, stanza) | 266 function (session, stanza) |
203 module:add_timer(1e-6, function () | 267 stoppable_timer(1e-6, function () |
204 if can_do_smacks(session) then | 268 if can_do_smacks(session) then |
205 if stanza:get_child("sm", xmlns_sm3) then | 269 if stanza:get_child("sm", xmlns_sm3) then |
206 session.sends2s(st.stanza("enable", sm3_attr)); | 270 session.sends2s(st.stanza("enable", sm3_attr)); |
207 session.smacks = xmlns_sm3; | 271 session.smacks = xmlns_sm3; |
208 elseif stanza:get_child("sm", xmlns_sm2) then | 272 elseif stanza:get_child("sm", xmlns_sm2) then |
251 if origin.delayed_ack_timer then | 315 if origin.delayed_ack_timer then |
252 origin.delayed_ack_timer:stop(); | 316 origin.delayed_ack_timer:stop(); |
253 origin.delayed_ack_timer = nil; | 317 origin.delayed_ack_timer = nil; |
254 end | 318 end |
255 -- Remove handled stanzas from outgoing_stanza_queue | 319 -- Remove handled stanzas from outgoing_stanza_queue |
256 log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or ""); | 320 -- origin.log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or ""); |
257 local h = tonumber(stanza.attr.h); | 321 local h = tonumber(stanza.attr.h); |
258 if not h then | 322 if not h then |
259 origin:close{ condition = "invalid-xml"; text = "Missing or invalid 'h' attribute"; }; | 323 origin:close{ condition = "invalid-xml"; text = "Missing or invalid 'h' attribute"; }; |
260 end | 324 end |
261 local handled_stanza_count = h-origin.last_acknowledged_stanza; | 325 local handled_stanza_count = h-origin.last_acknowledged_stanza; |
328 elseif curr_session and curr_session.resumption_token == resumption_token | 392 elseif curr_session and curr_session.resumption_token == resumption_token |
329 -- Check the hibernate time still matches what we think it is, | 393 -- Check the hibernate time still matches what we think it is, |
330 -- otherwise the session resumed and re-hibernated. | 394 -- otherwise the session resumed and re-hibernated. |
331 and session.hibernating == hibernate_time then | 395 and session.hibernating == hibernate_time then |
332 session.log("debug", "Destroying session for hibernating too long"); | 396 session.log("debug", "Destroying session for hibernating too long"); |
333 session_registry[session.resumption_token] = nil; | 397 session_registry.set(session.username, session.resumption_token, nil); |
398 -- save only actual h value and username/host (for security) | |
399 old_session_registry.set(session.username, session.resumption_token, { | |
400 h = session.handled_stanza_count, | |
401 username = session.username, | |
402 host = session.host | |
403 }); | |
334 session.resumption_token = nil; | 404 session.resumption_token = nil; |
335 sessionmanager.destroy_session(session); | 405 sessionmanager.destroy_session(session); |
336 else | 406 else |
337 session.log("debug", "Session resumed before hibernation timeout, all is well") | 407 session.log("debug", "Session resumed before hibernation timeout, all is well") |
338 end | 408 end |
370 ); | 440 ); |
371 return true; | 441 return true; |
372 end | 442 end |
373 | 443 |
374 local id = stanza.attr.previd; | 444 local id = stanza.attr.previd; |
375 local original_session = session_registry[id]; | 445 local original_session = session_registry.get(session.username, id); |
376 if not original_session then | 446 if not original_session then |
377 session.log("debug", "Tried to resume non-existent session with id %s", id); | 447 session.log("debug", "Tried to resume non-existent session with id %s", id); |
378 session.send(st.stanza("failed", { xmlns = xmlns_sm }) | 448 local old_session = old_session_registry.get(session.username, id); |
379 :tag("item-not-found", { xmlns = xmlns_errors }) | 449 if old_session and session.username == old_session.username |
380 ); | 450 and session.host == old_session.host |
451 and old_session.h then | |
452 session.send(st.stanza("failed", { xmlns = xmlns_sm, h = tostring(old_session.h) }) | |
453 :tag("item-not-found", { xmlns = xmlns_errors }) | |
454 ); | |
455 else | |
456 session.send(st.stanza("failed", { xmlns = xmlns_sm }) | |
457 :tag("item-not-found", { xmlns = xmlns_errors }) | |
458 ); | |
459 end; | |
381 elseif session.username == original_session.username | 460 elseif session.username == original_session.username |
382 and session.host == original_session.host then | 461 and session.host == original_session.host then |
383 session.log("debug", "mod_smacks resuming existing session..."); | 462 session.log("debug", "mod_smacks resuming existing session..."); |
384 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) | 463 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) |
385 if original_session.conn then | 464 if original_session.conn then |
446 end | 525 end |
447 session.log("debug", "Sending <r> (read timeout)"); | 526 session.log("debug", "Sending <r> (read timeout)"); |
448 session.awaiting_ack = false; | 527 session.awaiting_ack = false; |
449 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })); | 528 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })); |
450 session.awaiting_ack = true; | 529 session.awaiting_ack = true; |
451 session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function() | 530 if not session.delayed_ack_timer then |
452 delayed_ack_function(session); | 531 session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() |
453 end); | 532 delayed_ack_function(session); |
533 end); | |
534 end | |
454 return true; | 535 return true; |
455 end | 536 end |
456 end | 537 end |
457 | 538 |
458 module:hook("s2s-read-timeout", handle_read_timeout); | 539 module:hook("s2s-read-timeout", handle_read_timeout); |