Software /
code /
prosody
Comparison
plugins/mod_cloud_notify.lua @ 13616:2f38f3275a74
mod_cloud_notify: Merge from prosody-modules@fc521fb5ffa0
Many thanks to Thilo Molitor and Kim Alvefur for their work on this module
while it was in the community repository. It has been stable for some time, is
widely used, and provides a feature that is important to most deployments.
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Thu, 09 Jan 2025 16:49:27 +0000 |
child | 13701:1aa7efabeacb |
comparison
equal
deleted
inserted
replaced
13615:b03b5716e4cf | 13616:2f38f3275a74 |
---|---|
1 -- XEP-0357: Push (aka: My mobile OS vendor won't let me have persistent TCP connections) | |
2 -- Copyright (C) 2015-2016 Kim Alvefur | |
3 -- Copyright (C) 2017-2019 Thilo Molitor | |
4 -- | |
5 -- This file is MIT/X11 licensed. | |
6 | |
7 local os_time = os.time; | |
8 local st = require"util.stanza"; | |
9 local jid = require"util.jid"; | |
10 local dataform = require"util.dataforms".new; | |
11 local hashes = require"util.hashes"; | |
12 local random = require"util.random"; | |
13 local cache = require"util.cache"; | |
14 local watchdog = require "util.watchdog"; | |
15 | |
16 local xmlns_push = "urn:xmpp:push:0"; | |
17 | |
18 -- configuration | |
19 local include_body = module:get_option_boolean("push_notification_with_body", false); | |
20 local include_sender = module:get_option_boolean("push_notification_with_sender", false); | |
21 local max_push_errors = module:get_option_number("push_max_errors", 16); | |
22 local max_push_devices = module:get_option_number("push_max_devices", 5); | |
23 local dummy_body = module:get_option_string("push_notification_important_body", "New Message!"); | |
24 local extended_hibernation_timeout = module:get_option_number("push_max_hibernation_timeout", 72*3600); -- use same timeout like ejabberd | |
25 | |
26 local host_sessions = prosody.hosts[module.host].sessions; | |
27 local push_errors = module:shared("push_errors"); | |
28 local id2node = {}; | |
29 local id2identifier = {}; | |
30 | |
31 -- For keeping state across reloads while caching reads | |
32 -- This uses util.cache for caching the most recent devices and removing all old devices when max_push_devices is reached | |
33 local push_store = (function() | |
34 local store = module:open_store(); | |
35 local push_services = {}; | |
36 local api = {}; | |
37 --luacheck: ignore 212/self | |
38 function api:get(user) | |
39 if not push_services[user] then | |
40 local loaded, err = store:get(user); | |
41 if not loaded and err then | |
42 module:log("warn", "Error reading push notification storage for user '%s': %s", user, tostring(err)); | |
43 push_services[user] = cache.new(max_push_devices):table(); | |
44 return push_services[user], false; | |
45 end | |
46 if loaded then | |
47 push_services[user] = cache.new(max_push_devices):table(); | |
48 -- copy over plain table loaded from disk into our cache | |
49 for k, v in pairs(loaded) do push_services[user][k] = v; end | |
50 else | |
51 push_services[user] = cache.new(max_push_devices):table(); | |
52 end | |
53 end | |
54 return push_services[user], true; | |
55 end | |
56 function api:flush_to_disk(user) | |
57 local plain_table = {}; | |
58 for k, v in pairs(push_services[user]) do plain_table[k] = v; end | |
59 local ok, err = store:set(user, plain_table); | |
60 if not ok then | |
61 module:log("error", "Error writing push notification storage for user '%s': %s", user, tostring(err)); | |
62 return false; | |
63 end | |
64 return true; | |
65 end | |
66 function api:set_identifier(user, push_identifier, data) | |
67 local services = self:get(user); | |
68 services[push_identifier] = data; | |
69 end | |
70 return api; | |
71 end)(); | |
72 | |
73 | |
74 -- Forward declarations, as both functions need to reference each other | |
75 local handle_push_success, handle_push_error; | |
76 | |
77 function handle_push_error(event) | |
78 local stanza = event.stanza; | |
79 local error_type, condition, error_text = stanza:get_error(); | |
80 local node = id2node[stanza.attr.id]; | |
81 local identifier = id2identifier[stanza.attr.id]; | |
82 if node == nil then | |
83 module:log("warn", "Received push error with unrecognised id: %s", stanza.attr.id); | |
84 return false; -- unknown stanza? Ignore for now! | |
85 end | |
86 local from = stanza.attr.from; | |
87 local user_push_services = push_store:get(node); | |
88 local found, changed = false, false; | |
89 | |
90 for push_identifier, _ in pairs(user_push_services) do | |
91 if push_identifier == identifier then | |
92 found = true; | |
93 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and error_type ~= "wait" then | |
94 push_errors[push_identifier] = push_errors[push_identifier] + 1; | |
95 module:log("info", "Got error <%s:%s:%s> for identifier '%s': " | |
96 .."error count for this identifier is now at %s", error_type, condition, error_text or "", push_identifier, | |
97 tostring(push_errors[push_identifier])); | |
98 if push_errors[push_identifier] >= max_push_errors then | |
99 module:log("warn", "Disabling push notifications for identifier '%s'", push_identifier); | |
100 -- remove push settings from sessions | |
101 if host_sessions[node] then | |
102 for _, session in pairs(host_sessions[node].sessions) do | |
103 if session.push_identifier == push_identifier then | |
104 session.push_identifier = nil; | |
105 session.push_settings = nil; | |
106 session.first_hibernated_push = nil; | |
107 -- check for prosody 0.12 mod_smacks | |
108 if session.hibernating_watchdog and session.original_smacks_callback and session.original_smacks_timeout then | |
109 -- restore old smacks watchdog | |
110 session.hibernating_watchdog:cancel(); | |
111 session.hibernating_watchdog = watchdog.new(session.original_smacks_timeout, session.original_smacks_callback); | |
112 end | |
113 end | |
114 end | |
115 end | |
116 -- save changed global config | |
117 changed = true; | |
118 user_push_services[push_identifier] = nil | |
119 push_errors[push_identifier] = nil; | |
120 -- unhook iq handlers for this identifier (if possible) | |
121 module:unhook("iq-error/host/"..stanza.attr.id, handle_push_error); | |
122 module:unhook("iq-result/host/"..stanza.attr.id, handle_push_success); | |
123 id2node[stanza.attr.id] = nil; | |
124 id2identifier[stanza.attr.id] = nil; | |
125 end | |
126 elseif user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and error_type == "wait" then | |
127 module:log("debug", "Got error <%s:%s:%s> for identifier '%s': " | |
128 .."NOT increasing error count for this identifier", error_type, condition, error_text or "", push_identifier); | |
129 else | |
130 module:log("debug", "Unhandled push error <%s:%s:%s> from %s for identifier '%s'", | |
131 error_type, condition, error_text or "", from, push_identifier | |
132 ); | |
133 end | |
134 end | |
135 end | |
136 if changed then | |
137 push_store:flush_to_disk(node); | |
138 elseif not found then | |
139 module:log("warn", "Unable to find matching registration for push error <%s:%s:%s> from %s", error_type, condition, error_text or "", from); | |
140 end | |
141 return true; | |
142 end | |
143 | |
144 function handle_push_success(event) | |
145 local stanza = event.stanza; | |
146 local node = id2node[stanza.attr.id]; | |
147 local identifier = id2identifier[stanza.attr.id]; | |
148 if node == nil then return false; end -- unknown stanza? Ignore for now! | |
149 local from = stanza.attr.from; | |
150 local user_push_services = push_store:get(node); | |
151 | |
152 for push_identifier, _ in pairs(user_push_services) do | |
153 if push_identifier == identifier then | |
154 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] > 0 then | |
155 push_errors[push_identifier] = 0; | |
156 -- unhook iq handlers for this identifier (if possible) | |
157 module:unhook("iq-error/host/"..stanza.attr.id, handle_push_error); | |
158 module:unhook("iq-result/host/"..stanza.attr.id, handle_push_success); | |
159 id2node[stanza.attr.id] = nil; | |
160 id2identifier[stanza.attr.id] = nil; | |
161 module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s again", | |
162 push_identifier, tostring(push_errors[push_identifier]) | |
163 ); | |
164 end | |
165 end | |
166 end | |
167 return true; | |
168 end | |
169 | |
170 -- http://xmpp.org/extensions/xep-0357.html#disco | |
171 local function account_dico_info(event) | |
172 (event.reply or event.stanza):tag("feature", {var=xmlns_push}):up(); | |
173 end | |
174 module:hook("account-disco-info", account_dico_info); | |
175 | |
176 -- http://xmpp.org/extensions/xep-0357.html#enabling | |
177 local function push_enable(event) | |
178 local origin, stanza = event.origin, event.stanza; | |
179 local enable = stanza.tags[1]; | |
180 origin.log("debug", "Attempting to enable push notifications"); | |
181 -- MUST contain a 'jid' attribute of the XMPP Push Service being enabled | |
182 local push_jid = enable.attr.jid; | |
183 -- SHOULD contain a 'node' attribute | |
184 local push_node = enable.attr.node; | |
185 -- CAN contain a 'include_payload' attribute | |
186 local include_payload = enable.attr.include_payload; | |
187 if not push_jid then | |
188 origin.log("debug", "Push notification enable request missing the 'jid' field"); | |
189 origin.send(st.error_reply(stanza, "modify", "bad-request", "Missing jid")); | |
190 return true; | |
191 end | |
192 if push_jid == stanza.attr.from then | |
193 origin.log("debug", "Push notification enable request 'jid' field identical to our own"); | |
194 origin.send(st.error_reply(stanza, "modify", "bad-request", "JID must be different from ours")); | |
195 return true; | |
196 end | |
197 local publish_options = enable:get_child("x", "jabber:x:data"); | |
198 if not publish_options then | |
199 -- Could be intentional | |
200 origin.log("debug", "No publish options in request"); | |
201 end | |
202 local push_identifier = push_jid .. "<" .. (push_node or ""); | |
203 local push_service = { | |
204 jid = push_jid; | |
205 node = push_node; | |
206 include_payload = include_payload; | |
207 options = publish_options and st.preserialize(publish_options); | |
208 timestamp = os_time(); | |
209 client_id = origin.client_id; | |
210 resource = not origin.client_id and origin.resource or nil; | |
211 language = stanza.attr["xml:lang"]; | |
212 }; | |
213 local allow_registration = module:fire_event("cloud_notify/registration", { | |
214 origin = origin, stanza = stanza, push_info = push_service; | |
215 }); | |
216 if allow_registration == false then | |
217 return true; -- Assume error reply already sent | |
218 end | |
219 push_store:set_identifier(origin.username, push_identifier, push_service); | |
220 local ok = push_store:flush_to_disk(origin.username); | |
221 if not ok then | |
222 origin.send(st.error_reply(stanza, "wait", "internal-server-error")); | |
223 else | |
224 origin.push_identifier = push_identifier; | |
225 origin.push_settings = push_service; | |
226 origin.first_hibernated_push = nil; | |
227 origin.log("info", "Push notifications enabled for %s (%s)", tostring(stanza.attr.from), tostring(origin.push_identifier)); | |
228 origin.send(st.reply(stanza)); | |
229 end | |
230 return true; | |
231 end | |
232 module:hook("iq-set/self/"..xmlns_push..":enable", push_enable); | |
233 | |
234 -- http://xmpp.org/extensions/xep-0357.html#disabling | |
235 local function push_disable(event) | |
236 local origin, stanza = event.origin, event.stanza; | |
237 local push_jid = stanza.tags[1].attr.jid; -- MUST include a 'jid' attribute | |
238 local push_node = stanza.tags[1].attr.node; -- A 'node' attribute MAY be included | |
239 if not push_jid then | |
240 origin.send(st.error_reply(stanza, "modify", "bad-request", "Missing jid")); | |
241 return true; | |
242 end | |
243 local user_push_services = push_store:get(origin.username); | |
244 for key, push_info in pairs(user_push_services) do | |
245 if push_info.jid == push_jid and (not push_node or push_info.node == push_node) then | |
246 origin.log("info", "Push notifications disabled (%s)", tostring(key)); | |
247 if origin.push_identifier == key then | |
248 origin.push_identifier = nil; | |
249 origin.push_settings = nil; | |
250 origin.first_hibernated_push = nil; | |
251 -- check for prosody 0.12 mod_smacks | |
252 if origin.hibernating_watchdog and origin.original_smacks_callback and origin.original_smacks_timeout then | |
253 -- restore old smacks watchdog | |
254 origin.hibernating_watchdog:cancel(); | |
255 origin.hibernating_watchdog = watchdog.new(origin.original_smacks_timeout, origin.original_smacks_callback); | |
256 end | |
257 end | |
258 user_push_services[key] = nil; | |
259 push_errors[key] = nil; | |
260 for stanza_id, identifier in pairs(id2identifier) do | |
261 if identifier == key then | |
262 module:unhook("iq-error/host/"..stanza_id, handle_push_error); | |
263 module:unhook("iq-result/host/"..stanza_id, handle_push_success); | |
264 id2node[stanza_id] = nil; | |
265 id2identifier[stanza_id] = nil; | |
266 end | |
267 end | |
268 end | |
269 end | |
270 local ok = push_store:flush_to_disk(origin.username); | |
271 if not ok then | |
272 origin.send(st.error_reply(stanza, "wait", "internal-server-error")); | |
273 else | |
274 origin.send(st.reply(stanza)); | |
275 end | |
276 return true; | |
277 end | |
278 module:hook("iq-set/self/"..xmlns_push..":disable", push_disable); | |
279 | |
280 -- urgent stanzas should be delivered without delay | |
281 local function is_urgent(stanza) | |
282 -- TODO | |
283 if stanza.name == "message" then | |
284 if stanza:get_child("propose", "urn:xmpp:jingle-message:0") then | |
285 return true, "jingle call"; | |
286 end | |
287 end | |
288 end | |
289 | |
290 -- is this push a high priority one (this is needed for ios apps not using voip pushes) | |
291 local function is_important(stanza) | |
292 local st_name = stanza and stanza.name or nil; | |
293 if not st_name then return false; end -- nonzas are never important here | |
294 if st_name == "presence" then | |
295 return false; -- same for presences | |
296 elseif st_name == "message" then | |
297 -- unpack carbon copied message stanzas | |
298 local carbon = stanza:find("{urn:xmpp:carbons:2}/{urn:xmpp:forward:0}/{jabber:client}message"); | |
299 local stanza_direction = carbon and stanza:child_with_name("sent") and "out" or "in"; | |
300 if carbon then stanza = carbon; end | |
301 local st_type = stanza.attr.type; | |
302 | |
303 -- headline message are always not important | |
304 if st_type == "headline" then return false; end | |
305 | |
306 -- carbon copied outgoing messages are not important | |
307 if carbon and stanza_direction == "out" then return false; end | |
308 | |
309 -- We can't check for body contents in encrypted messages, so let's treat them as important | |
310 -- Some clients don't even set a body or an empty body for encrypted messages | |
311 | |
312 -- check omemo https://xmpp.org/extensions/inbox/omemo.html | |
313 if stanza:get_child("encrypted", "eu.siacs.conversations.axolotl") or stanza:get_child("encrypted", "urn:xmpp:omemo:0") then return true; end | |
314 | |
315 -- check xep27 pgp https://xmpp.org/extensions/xep-0027.html | |
316 if stanza:get_child("x", "jabber:x:encrypted") then return true; end | |
317 | |
318 -- check xep373 pgp (OX) https://xmpp.org/extensions/xep-0373.html | |
319 if stanza:get_child("openpgp", "urn:xmpp:openpgp:0") then return true; end | |
320 | |
321 -- XEP-0353: Jingle Message Initiation (incoming call request) | |
322 if stanza:get_child("propose", "urn:xmpp:jingle-message:0") then return true; end | |
323 | |
324 local body = stanza:get_child_text("body"); | |
325 | |
326 -- groupchat subjects are not important here | |
327 if st_type == "groupchat" and stanza:get_child_text("subject") then | |
328 return false; | |
329 end | |
330 | |
331 -- empty bodies are not important | |
332 return body ~= nil and body ~= ""; | |
333 end | |
334 return false; -- this stanza wasn't one of the above cases --> it is not important, too | |
335 end | |
336 | |
337 local push_form = dataform { | |
338 { name = "FORM_TYPE"; type = "hidden"; value = "urn:xmpp:push:summary"; }; | |
339 { name = "message-count"; type = "text-single"; }; | |
340 { name = "pending-subscription-count"; type = "text-single"; }; | |
341 { name = "last-message-sender"; type = "jid-single"; }; | |
342 { name = "last-message-body"; type = "text-single"; }; | |
343 }; | |
344 | |
345 -- http://xmpp.org/extensions/xep-0357.html#publishing | |
346 local function handle_notify_request(stanza, node, user_push_services, log_push_decline) | |
347 local pushes = 0; | |
348 if not #user_push_services then return pushes end | |
349 | |
350 for push_identifier, push_info in pairs(user_push_services) do | |
351 local send_push = true; -- only send push to this node when not already done for this stanza or if no stanza is given at all | |
352 if stanza then | |
353 if not stanza._push_notify then stanza._push_notify = {}; end | |
354 if stanza._push_notify[push_identifier] then | |
355 if log_push_decline then | |
356 module:log("debug", "Already sent push notification for %s@%s to %s (%s)", node, module.host, push_info.jid, tostring(push_info.node)); | |
357 end | |
358 send_push = false; | |
359 end | |
360 stanza._push_notify[push_identifier] = true; | |
361 end | |
362 | |
363 if send_push then | |
364 -- construct push stanza | |
365 local stanza_id = hashes.sha256(random.bytes(8), true); | |
366 local push_notification_payload = st.stanza("notification", { xmlns = xmlns_push }); | |
367 local form_data = { | |
368 -- hardcode to 1 because other numbers are just meaningless (the XEP does not specify *what exactly* to count) | |
369 ["message-count"] = "1"; | |
370 }; | |
371 if stanza and include_sender then | |
372 form_data["last-message-sender"] = stanza.attr.from; | |
373 end | |
374 if stanza and include_body then | |
375 form_data["last-message-body"] = stanza:get_child_text("body"); | |
376 elseif stanza and dummy_body and is_important(stanza) then | |
377 form_data["last-message-body"] = tostring(dummy_body); | |
378 end | |
379 | |
380 push_notification_payload:add_child(push_form:form(form_data)); | |
381 | |
382 local push_publish = st.iq({ to = push_info.jid, from = module.host, type = "set", id = stanza_id }) | |
383 :tag("pubsub", { xmlns = "http://jabber.org/protocol/pubsub" }) | |
384 :tag("publish", { node = push_info.node }) | |
385 :tag("item") | |
386 :add_child(push_notification_payload) | |
387 :up() | |
388 :up(); | |
389 | |
390 if push_info.options then | |
391 push_publish:tag("publish-options"):add_child(st.deserialize(push_info.options)); | |
392 end | |
393 -- send out push | |
394 module:log("debug", "Sending %s push notification for %s@%s to %s (%s)", | |
395 form_data["last-message-body"] and "important" or "unimportant", | |
396 node, module.host, push_info.jid, tostring(push_info.node) | |
397 ); | |
398 -- module:log("debug", "PUSH STANZA: %s", tostring(push_publish)); | |
399 local push_event = { | |
400 notification_stanza = push_publish; | |
401 notification_payload = push_notification_payload; | |
402 original_stanza = stanza; | |
403 username = node; | |
404 push_info = push_info; | |
405 push_summary = form_data; | |
406 important = not not form_data["last-message-body"]; | |
407 }; | |
408 | |
409 if module:fire_event("cloud_notify/push", push_event) then | |
410 module:log("debug", "Push was blocked by event handler: %s", push_event.reason or "Unknown reason"); | |
411 else | |
412 -- handle push errors for this node | |
413 if push_errors[push_identifier] == nil then | |
414 push_errors[push_identifier] = 0; | |
415 end | |
416 module:hook("iq-error/host/"..stanza_id, handle_push_error); | |
417 module:hook("iq-result/host/"..stanza_id, handle_push_success); | |
418 id2node[stanza_id] = node; | |
419 id2identifier[stanza_id] = push_identifier; | |
420 module:send(push_publish); | |
421 pushes = pushes + 1; | |
422 end | |
423 end | |
424 end | |
425 return pushes; | |
426 end | |
427 | |
428 -- small helper function to extract relevant push settings | |
429 local function get_push_settings(stanza, session) | |
430 local to = stanza.attr.to; | |
431 local node = to and jid.split(to) or session.username; | |
432 local user_push_services = push_store:get(node); | |
433 return node, user_push_services; | |
434 end | |
435 | |
436 -- publish on offline message | |
437 module:hook("message/offline/handle", function(event) | |
438 local node, user_push_services = get_push_settings(event.stanza, event.origin); | |
439 module:log("debug", "Invoking cloud handle_notify_request() for offline stanza"); | |
440 handle_notify_request(event.stanza, node, user_push_services, true); | |
441 end, 1); | |
442 | |
443 -- publish on bare groupchat | |
444 -- this picks up MUC messages when there are no devices connected | |
445 module:hook("message/bare/groupchat", function(event) | |
446 module:log("debug", "Invoking cloud handle_notify_request() for bare groupchat stanza"); | |
447 local node, user_push_services = get_push_settings(event.stanza, event.origin); | |
448 handle_notify_request(event.stanza, node, user_push_services, true); | |
449 end, 1); | |
450 | |
451 | |
452 local function process_stanza_queue(queue, session, queue_type) | |
453 if not session.push_identifier then return; end | |
454 local user_push_services = {[session.push_identifier] = session.push_settings}; | |
455 local notified = { unimportant = false; important = false } | |
456 for i=1, #queue do | |
457 local stanza = queue[i]; | |
458 -- fast ignore of already pushed stanzas | |
459 if stanza and not (stanza._push_notify and stanza._push_notify[session.push_identifier]) then | |
460 local node = get_push_settings(stanza, session); | |
461 local stanza_type = "unimportant"; | |
462 if dummy_body and is_important(stanza) then stanza_type = "important"; end | |
463 if not notified[stanza_type] then -- only notify if we didn't try to push for this stanza type already | |
464 -- session.log("debug", "Invoking cloud handle_notify_request() for smacks queued stanza: %d", i); | |
465 if handle_notify_request(stanza, node, user_push_services, false) ~= 0 then | |
466 if session.hibernating and not session.first_hibernated_push then | |
467 -- if important stanzas are treated differently (pushed with last-message-body field set to dummy string) | |
468 -- if the message was important (e.g. had a last-message-body field) OR if we treat all pushes equally, | |
469 -- then record the time of first push in the session for the smack module which will extend its hibernation | |
470 -- timeout based on the value of session.first_hibernated_push | |
471 if not dummy_body or (dummy_body and is_important(stanza)) then | |
472 session.first_hibernated_push = os_time(); | |
473 -- check for prosody 0.12 mod_smacks | |
474 if session.hibernating_watchdog and session.original_smacks_callback and session.original_smacks_timeout then | |
475 -- restore old smacks watchdog (--> the start of our original timeout will be delayed until first push) | |
476 session.hibernating_watchdog:cancel(); | |
477 session.hibernating_watchdog = watchdog.new(session.original_smacks_timeout, session.original_smacks_callback); | |
478 end | |
479 end | |
480 end | |
481 session.log("debug", "Cloud handle_notify_request() > 0, not notifying for other %s queued stanzas of type %s", queue_type, stanza_type); | |
482 notified[stanza_type] = true | |
483 end | |
484 end | |
485 end | |
486 if notified.unimportant and notified.important then break; end -- stop processing the queue if all push types are exhausted | |
487 end | |
488 end | |
489 | |
490 -- publish on unacked smacks message (use timer to send out push for all stanzas submitted in a row only once) | |
491 local function process_stanza(session, stanza) | |
492 if session.push_identifier then | |
493 session.log("debug", "adding new stanza to push_queue"); | |
494 if not session.push_queue then session.push_queue = {}; end | |
495 local queue = session.push_queue; | |
496 queue[#queue+1] = st.clone(stanza); | |
497 if not session.awaiting_push_timer then -- timer not already running --> start new timer | |
498 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanza (in a moment)"); | |
499 session.awaiting_push_timer = module:add_timer(1.0, function () | |
500 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanzas (now in timer)"); | |
501 process_stanza_queue(session.push_queue, session, "push"); | |
502 session.push_queue = {}; -- clean up queue after push | |
503 session.awaiting_push_timer = nil; | |
504 end); | |
505 end | |
506 end | |
507 return stanza; | |
508 end | |
509 | |
510 local function process_smacks_stanza(event) | |
511 local session = event.origin; | |
512 local stanza = event.stanza; | |
513 if not session.push_identifier then | |
514 session.log("debug", "NOT invoking cloud handle_notify_request() for newly smacks queued stanza (session.push_identifier is not set: %s)", | |
515 session.push_identifier | |
516 ); | |
517 else | |
518 process_stanza(session, stanza) | |
519 end | |
520 end | |
521 | |
522 -- smacks hibernation is started | |
523 local function hibernate_session(event) | |
524 local session = event.origin; | |
525 local queue = event.queue; | |
526 session.first_hibernated_push = nil; | |
527 if session.push_identifier and session.hibernating_watchdog then -- check for prosody 0.12 mod_smacks | |
528 -- save old watchdog callback and timeout | |
529 session.original_smacks_callback = session.hibernating_watchdog.callback; | |
530 session.original_smacks_timeout = session.hibernating_watchdog.timeout; | |
531 -- cancel old watchdog and create a new watchdog with extended timeout | |
532 session.hibernating_watchdog:cancel(); | |
533 session.hibernating_watchdog = watchdog.new(extended_hibernation_timeout, function() | |
534 session.log("debug", "Push-extended smacks watchdog triggered"); | |
535 if session.original_smacks_callback then | |
536 session.log("debug", "Calling original smacks watchdog handler"); | |
537 session.original_smacks_callback(); | |
538 end | |
539 end); | |
540 end | |
541 -- process unacked stanzas | |
542 process_stanza_queue(queue, session, "smacks"); | |
543 end | |
544 | |
545 -- smacks hibernation is ended | |
546 local function restore_session(event) | |
547 local session = event.resumed; | |
548 if session then -- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one | |
549 if session.awaiting_push_timer then | |
550 session.awaiting_push_timer:stop(); | |
551 session.awaiting_push_timer = nil; | |
552 end | |
553 session.first_hibernated_push = nil; | |
554 -- the extended smacks watchdog will be canceled by the smacks module, no need to anything here | |
555 end | |
556 end | |
557 | |
558 -- smacks ack is delayed | |
559 local function ack_delayed(event) | |
560 local session = event.origin; | |
561 local queue = event.queue; | |
562 local stanza = event.stanza; | |
563 if not session.push_identifier then return; end | |
564 if stanza then process_stanza(session, stanza); return; end -- don't iterate through smacks queue if we know which stanza triggered this | |
565 for i=1, #queue do | |
566 local queued_stanza = queue[i]; | |
567 -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas) | |
568 process_stanza(session, queued_stanza); | |
569 end | |
570 end | |
571 | |
572 -- archive message added | |
573 local function archive_message_added(event) | |
574 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id } | |
575 -- only notify for new mam messages when at least one device is online | |
576 if not event.for_user or not host_sessions[event.for_user] then return; end | |
577 local stanza = event.stanza; | |
578 local user_session = host_sessions[event.for_user].sessions; | |
579 local to = stanza.attr.to; | |
580 to = to and jid.split(to) or event.origin.username; | |
581 | |
582 -- only notify if the stanza destination is the mam user we store it for | |
583 if event.for_user == to then | |
584 local user_push_services = push_store:get(to); | |
585 | |
586 -- Urgent stanzas are time-sensitive (e.g. calls) and should | |
587 -- be pushed immediately to avoid getting stuck in the smacks | |
588 -- queue in case of dead connections, for example | |
589 local is_urgent_stanza, urgent_reason = is_urgent(event.stanza); | |
590 | |
591 local notify_push_services; | |
592 if is_urgent_stanza then | |
593 module:log("debug", "Urgent push for %s (%s)", to, urgent_reason); | |
594 notify_push_services = user_push_services; | |
595 else | |
596 -- only notify nodes with no active sessions (smacks is counted as active and handled separate) | |
597 notify_push_services = {}; | |
598 for identifier, push_info in pairs(user_push_services) do | |
599 local identifier_found = nil; | |
600 for _, session in pairs(user_session) do | |
601 if session.push_identifier == identifier then | |
602 identifier_found = session; | |
603 break; | |
604 end | |
605 end | |
606 if identifier_found then | |
607 identifier_found.log("debug", "Not cloud notifying '%s' of new MAM stanza (session still alive)", identifier); | |
608 else | |
609 notify_push_services[identifier] = push_info; | |
610 end | |
611 end | |
612 end | |
613 | |
614 handle_notify_request(event.stanza, to, notify_push_services, true); | |
615 end | |
616 end | |
617 | |
618 module:hook("smacks-hibernation-start", hibernate_session); | |
619 module:hook("smacks-hibernation-end", restore_session); | |
620 module:hook("smacks-ack-delayed", ack_delayed); | |
621 module:hook("smacks-hibernation-stanza-queued", process_smacks_stanza); | |
622 module:hook("archive-message-added", archive_message_added); | |
623 | |
624 local function send_ping(event) | |
625 local user = event.user; | |
626 local push_services = event.push_services or push_store:get(user); | |
627 module:log("debug", "Handling event 'cloud-notify-ping' for user '%s'", user); | |
628 local retval = handle_notify_request(nil, user, push_services, true); | |
629 module:log("debug", "handle_notify_request() returned %s", tostring(retval)); | |
630 end | |
631 -- can be used by other modules to ping one or more (or all) push endpoints | |
632 module:hook("cloud-notify-ping", send_ping); | |
633 | |
634 module:log("info", "Module loaded"); | |
635 function module.unload() | |
636 module:log("info", "Unloading module"); | |
637 -- cleanup some settings, reloading this module can cause process_smacks_stanza() to stop working otherwise | |
638 for user, _ in pairs(host_sessions) do | |
639 for _, session in pairs(host_sessions[user].sessions) do | |
640 if session.awaiting_push_timer then session.awaiting_push_timer:stop(); end | |
641 session.awaiting_push_timer = nil; | |
642 session.push_queue = nil; | |
643 session.first_hibernated_push = nil; | |
644 -- check for prosody 0.12 mod_smacks | |
645 if session.hibernating_watchdog and session.original_smacks_callback and session.original_smacks_timeout then | |
646 -- restore old smacks watchdog | |
647 session.hibernating_watchdog:cancel(); | |
648 session.hibernating_watchdog = watchdog.new(session.original_smacks_timeout, session.original_smacks_callback); | |
649 end | |
650 end | |
651 end | |
652 module:log("info", "Module unloaded"); | |
653 end |