Software /
code /
prosody-modules
Comparison
mod_cloud_notify/mod_cloud_notify.lua @ 2609:6ab46ff685d0
mod_cloud_notify: Respect Daniel's business rules and remove endpoints on error
Daniel's business rules can be found here: https://mail.jabber.org/pipermail/standards/2016-February/030925.html
All implementation changes are documented in depth in the file business_rules.markdown
author | tmolitor <thilo@eightysoft.de> |
---|---|
date | Sat, 11 Mar 2017 01:42:45 +0100 |
parent | 2395:2e641ab995b3 |
child | 2625:8c6562f16496 |
comparison
equal
deleted
inserted
replaced
2608:362ca94192ee | 2609:6ab46ff685d0 |
---|---|
1 -- XEP-0357: Push (aka: My mobile OS vendor won't let me have persistent TCP connections) | 1 -- XEP-0357: Push (aka: My mobile OS vendor won't let me have persistent TCP connections) |
2 -- Copyright (C) 2015-2016 Kim Alvefur | 2 -- Copyright (C) 2015-2016 Kim Alvefur |
3 -- Copyright (C) 2017 Thilo Molitor | |
3 -- | 4 -- |
4 -- This file is MIT/X11 licensed. | 5 -- This file is MIT/X11 licensed. |
5 | 6 |
6 local st = require"util.stanza"; | 7 local st = require"util.stanza"; |
7 local jid = require"util.jid"; | 8 local jid = require"util.jid"; |
11 local xmlns_push = "urn:xmpp:push:0"; | 12 local xmlns_push = "urn:xmpp:push:0"; |
12 | 13 |
13 -- configuration | 14 -- configuration |
14 local include_body = module:get_option_boolean("push_notification_with_body", false); | 15 local include_body = module:get_option_boolean("push_notification_with_body", false); |
15 local include_sender = module:get_option_boolean("push_notification_with_sender", false); | 16 local include_sender = module:get_option_boolean("push_notification_with_sender", false); |
16 | 17 local max_push_errors = module:get_option_number("push_max_errors", 50); |
17 -- For keeping state across reloads | 18 |
18 local push_enabled = module:open_store(); | 19 local host_sessions = prosody.hosts[module.host].sessions; |
19 -- TODO map store would be better here | 20 local push_errors = {}; |
21 | |
22 -- For keeping state across reloads while caching reads | |
23 local push_store = (function() | |
24 local store = module:open_store(); | |
25 local push_services = {}; | |
26 local api = {}; | |
27 function api:get(user) | |
28 if not push_services[user] then | |
29 local err; | |
30 push_services[user], err = store:get(user); | |
31 if not push_services[user] and err then | |
32 module:log("warn", "Error reading push notification storage for user '%s': %s", user, tostring(err)); | |
33 push_services[user] = {}; | |
34 return push_services[user], false; | |
35 end | |
36 end | |
37 if not push_services[user] then push_services[user] = {} end | |
38 return push_services[user], true; | |
39 end | |
40 function api:set(user, data) | |
41 push_services[user] = data; | |
42 local ok, err = store:set(user, push_services[user]); | |
43 if not ok then | |
44 module:log("error", "Error writing push notification storage for user '%s': %s", user, tostring(err)); | |
45 return false; | |
46 end | |
47 return true; | |
48 end | |
49 function api:set_identifier(user, push_identifier, data) | |
50 local services = self:get(user); | |
51 services[push_identifier] = data; | |
52 return self:set(user, services); | |
53 end | |
54 return api; | |
55 end)(); | |
56 | |
57 local function handle_push_error(event) | |
58 local stanza = event.stanza; | |
59 local error_type, condition = stanza:get_error(); | |
60 local push_identifier = stanza.attr.id; | |
61 local node = jid.split(stanza.attr.to); | |
62 local from = stanza.attr.from; | |
63 local user_push_services = push_store:get(node); | |
64 | |
65 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and error_type ~= "wait" then | |
66 push_errors[push_identifier] = push_errors[push_identifier] + 1; | |
67 module:log("info", "Got error of type '%s' (%s) for identifier '%s':" | |
68 .."error count for this identifier is now at %s", error_type, condition, push_identifier, | |
69 tostring(push_errors[push_identifier])); | |
70 if push_errors[push_identifier] >= max_push_errors then | |
71 module:log("warn", "Disabling push notifications for identifier '%s'", push_identifier); | |
72 -- remove push settings from sessions | |
73 for _, session in pairs(host_sessions[node].sessions) do | |
74 if session.push_identifier == push_identifier then | |
75 session.push_identifier = nil; | |
76 session.push_settings = nil; | |
77 end | |
78 end | |
79 -- save changed global config | |
80 push_store:set_identifier(node, push_identifier, nil); | |
81 push_errors[push_identifier] = nil; | |
82 -- unhook iq handlers for this identifier | |
83 module:unhook("iq-error/bare/"..push_identifier, handle_push_error); | |
84 module:unhook("iq-result/bare/"..push_identifier, handle_push_success); | |
85 end | |
86 end | |
87 return true; | |
88 end | |
89 | |
90 local function handle_push_success(event) | |
91 local stanza = event.stanza; | |
92 local push_identifier = stanza.attr.id; | |
93 local node = jid.split(stanza.attr.to); | |
94 local from = stanza.attr.from; | |
95 local user_push_services = push_store:get(node); | |
96 | |
97 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] then | |
98 push_errors[push_identifier] = 0; | |
99 module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s", push_identifier, tostring(push_errors[push_identifier])); | |
100 end | |
101 return true; | |
102 end | |
20 | 103 |
21 -- http://xmpp.org/extensions/xep-0357.html#disco | 104 -- http://xmpp.org/extensions/xep-0357.html#disco |
22 module:hook("account-disco-info", function(event) | 105 local function account_dico_info(event) |
23 (event.reply or event.stanza):tag("feature", {var=xmlns_push}):up(); | 106 (event.reply or event.stanza):tag("feature", {var=xmlns_push}):up(); |
24 end); | 107 end |
108 module:hook("account-disco-info", account_dico_info); | |
25 | 109 |
26 -- http://xmpp.org/extensions/xep-0357.html#enabling | 110 -- http://xmpp.org/extensions/xep-0357.html#enabling |
27 module:hook("iq-set/self/"..xmlns_push..":enable", function (event) | 111 local function push_enable(event) |
28 local origin, stanza = event.origin, event.stanza; | 112 local origin, stanza = event.origin, event.stanza; |
29 local enable = stanza.tags[1]; | 113 local enable = stanza.tags[1]; |
30 origin.log("debug", "Attempting to enable push notifications"); | 114 origin.log("debug", "Attempting to enable push notifications"); |
31 -- MUST contain a 'jid' attribute of the XMPP Push Service being enabled | 115 -- MUST contain a 'jid' attribute of the XMPP Push Service being enabled |
32 local push_jid = enable.attr.jid; | 116 local push_jid = enable.attr.jid; |
40 local publish_options = enable:get_child("x", "jabber:x:data"); | 124 local publish_options = enable:get_child("x", "jabber:x:data"); |
41 if not publish_options then | 125 if not publish_options then |
42 -- Could be intentional | 126 -- Could be intentional |
43 origin.log("debug", "No publish options in request"); | 127 origin.log("debug", "No publish options in request"); |
44 end | 128 end |
45 local user_push_services, rerr = push_enabled:get(origin.username); | 129 local push_identifier = push_jid .. "<" .. (push_node or ""); |
46 if not user_push_services then | 130 local push_service = { |
47 if rerr then | |
48 module:log("warn", "Error reading push notification storage: %s", rerr); | |
49 origin.send(st.error_reply(stanza, "wait", "internal-server-error")); | |
50 return true; | |
51 end | |
52 user_push_services = {}; | |
53 end | |
54 user_push_services[push_jid .. "<" .. (push_node or "")] = { | |
55 jid = push_jid; | 131 jid = push_jid; |
56 node = push_node; | 132 node = push_node; |
57 count = 0; | 133 count = 0; |
58 options = publish_options and st.preserialize(publish_options); | 134 options = publish_options and st.preserialize(publish_options); |
59 }; | 135 }; |
60 local ok, err = push_enabled:set(origin.username, user_push_services); | 136 local ok = push_store:set_identifier(origin.username, push_identifier, push_service); |
61 if not ok then | 137 if not ok then |
62 origin.send(st.error_reply(stanza, "wait", "internal-server-error")); | 138 origin.send(st.error_reply(stanza, "wait", "internal-server-error")); |
63 else | 139 else |
64 origin.log("info", "Push notifications enabled"); | 140 origin.push_identifier = push_identifier; |
141 origin.push_settings = push_service; | |
142 origin.log("info", "Push notifications enabled (%s)", tostring(origin.push_identifier)); | |
65 origin.send(st.reply(stanza)); | 143 origin.send(st.reply(stanza)); |
66 end | 144 end |
67 return true; | 145 return true; |
68 end); | 146 end |
147 module:hook("iq-set/self/"..xmlns_push..":enable", push_enable); | |
69 | 148 |
70 -- http://xmpp.org/extensions/xep-0357.html#disabling | 149 -- http://xmpp.org/extensions/xep-0357.html#disabling |
71 module:hook("iq-set/self/"..xmlns_push..":disable", function (event) | 150 local function push_disable(event) |
72 local origin, stanza = event.origin, event.stanza; | 151 local origin, stanza = event.origin, event.stanza; |
73 local push_jid = stanza.tags[1].attr.jid; -- MUST include a 'jid' attribute | 152 local push_jid = stanza.tags[1].attr.jid; -- MUST include a 'jid' attribute |
74 local push_node = stanza.tags[1].attr.node; -- A 'node' attribute MAY be included | 153 local push_node = stanza.tags[1].attr.node; -- A 'node' attribute MAY be included |
75 if not push_jid then | 154 if not push_jid then |
76 origin.send(st.error_reply(stanza, "modify", "bad-request", "Missing jid")); | 155 origin.send(st.error_reply(stanza, "modify", "bad-request", "Missing jid")); |
77 return true; | 156 return true; |
78 end | 157 end |
79 local user_push_services = push_enabled:get(origin.username); | 158 local user_push_services = push_store:get(origin.username); |
80 for key, push_info in pairs(user_push_services) do | 159 for key, push_info in pairs(user_push_services) do |
81 if push_info.jid == push_jid and (not push_node or push_info.node == push_node) then | 160 if push_info.jid == push_jid and (not push_node or push_info.node == push_node) then |
161 origin.log("info", "Push notifications disabled (%s)", tostring(key)); | |
162 if origin.push_identifier == key then | |
163 origin.push_identifier = nil; | |
164 origin.push_settings = nil; | |
165 end | |
82 user_push_services[key] = nil; | 166 user_push_services[key] = nil; |
83 end | 167 push_errors[key] = nil; |
84 end | 168 module:unhook("iq-error/bare/"..key, handle_push_error); |
85 origin.send(st.reply(stanza)); | 169 module:unhook("iq-result/bare/"..key, handle_push_success); |
170 end | |
171 end | |
172 local ok = push_store:set(origin.username, user_push_services); | |
173 if not ok then | |
174 origin.send(st.error_reply(stanza, "wait", "internal-server-error")); | |
175 else | |
176 origin.send(st.reply(stanza)); | |
177 end | |
86 return true; | 178 return true; |
87 end); | 179 end |
180 module:hook("iq-set/self/"..xmlns_push..":disable", push_disable); | |
88 | 181 |
89 local push_form = dataform { | 182 local push_form = dataform { |
90 { name = "FORM_TYPE"; type = "hidden"; value = "urn:xmpp:push:summary"; }; | 183 { name = "FORM_TYPE"; type = "hidden"; value = "urn:xmpp:push:summary"; }; |
91 { name = "message-count"; type = "text-single"; }; | 184 { name = "message-count"; type = "text-single"; }; |
92 { name = "pending-subscription-count"; type = "text-single"; }; | 185 { name = "pending-subscription-count"; type = "text-single"; }; |
93 { name = "last-message-sender"; type = "jid-single"; }; | 186 { name = "last-message-sender"; type = "jid-single"; }; |
94 { name = "last-message-body"; type = "text-single"; }; | 187 { name = "last-message-body"; type = "text-single"; }; |
95 }; | 188 }; |
96 | 189 |
97 -- http://xmpp.org/extensions/xep-0357.html#publishing | 190 -- http://xmpp.org/extensions/xep-0357.html#publishing |
98 local function handle_notify_request(origin, stanza) | 191 local function handle_notify_request(stanza, node, user_push_services) |
99 local to = stanza.attr.to; | 192 if not user_push_services or not #user_push_services then return end |
100 local node = to and jid.split(to) or origin.username; | 193 |
101 local user_push_services = push_enabled:get(node); | 194 if stanza and stanza._notify then |
102 if not user_push_services then return end | 195 module:log("debug", "Already sent push notification to %s@%s for this stanza, not doing it again", node, module.host); |
103 | 196 return; |
104 for _, push_info in pairs(user_push_services) do | 197 end |
198 if stanza then | |
199 stanza._notify = true; | |
200 end | |
201 | |
202 for push_identifier, push_info in pairs(user_push_services) do | |
203 -- increment count and save it | |
105 push_info.count = push_info.count + 1; | 204 push_info.count = push_info.count + 1; |
106 local push_jid, push_node = push_info.jid, push_info.node; | 205 push_store:set_identifier(node, push_identifier, push_info); |
107 local push_publish = st.iq({ to = push_jid, from = node .. "@" .. module.host, type = "set", id = "push" }) | 206 -- construct push stanza |
207 local push_publish = st.iq({ to = push_info.jid, from = node .. "@" .. module.host, type = "set", id = push_identifier }) | |
108 :tag("pubsub", { xmlns = "http://jabber.org/protocol/pubsub" }) | 208 :tag("pubsub", { xmlns = "http://jabber.org/protocol/pubsub" }) |
109 :tag("publish", { node = push_node }) | 209 :tag("publish", { node = push_info.node }) |
110 :tag("item") | 210 :tag("item") |
111 :tag("notification", { xmlns = xmlns_push }); | 211 :tag("notification", { xmlns = xmlns_push }); |
112 local form_data = { | 212 local form_data = { |
113 ["message-count"] = tostring(push_info.count); | 213 ["message-count"] = tostring(push_info.count); |
114 }; | 214 }; |
115 if include_sender then | 215 if stanza and include_sender then |
116 form_data["last-message-sender"] = stanza.attr.from; | 216 form_data["last-message-sender"] = stanza.attr.from; |
117 end | 217 end |
118 if include_body then | 218 if stanza and include_body then |
119 form_data["last-message-body"] = stanza:get_child_text("body"); | 219 form_data["last-message-body"] = stanza:get_child_text("body"); |
120 end | 220 end |
121 push_publish:add_child(push_form:form(form_data)); | 221 push_publish:add_child(push_form:form(form_data)); |
122 push_publish:up(); -- / notification | 222 push_publish:up(); -- / notification |
123 push_publish:up(); -- / publish | 223 push_publish:up(); -- / publish |
124 push_publish:up(); -- / pubsub | 224 push_publish:up(); -- / pubsub |
125 if push_info.options then | 225 if push_info.options then |
126 push_publish:tag("publish-options"):add_child(st.deserialize(push_info.options)); | 226 push_publish:tag("publish-options"):add_child(st.deserialize(push_info.options)); |
127 end | 227 end |
128 module:log("debug", "Sending push notification for %s@%s to %s", node, module.host, push_jid); | 228 -- send out push |
229 module:log("debug", "Sending push notification for %s@%s to %s (%s)", node, module.host, push_info.jid, tostring(push_info.node)); | |
230 -- handle push errors for this node | |
231 if push_errors[push_identifier] == nil then | |
232 push_errors[push_identifier] = 0; | |
233 module:hook("iq-error/bare/"..push_identifier, handle_push_error); | |
234 module:hook("iq-result/bare/"..push_identifier, handle_push_success); | |
235 end | |
129 module:send(push_publish); | 236 module:send(push_publish); |
130 end | 237 end |
131 push_enabled:set(node, user_push_services); | 238 end |
239 | |
240 -- small helper function to extract relevant push settings | |
241 local function get_push_settings(stanza, session) | |
242 local to = stanza.attr.to; | |
243 local node = to and jid.split(to) or session.username; | |
244 local user_push_services = push_store:get(node); | |
245 return node, user_push_services; | |
132 end | 246 end |
133 | 247 |
134 -- publish on offline message | 248 -- publish on offline message |
135 module:hook("message/offline/handle", function(event) | 249 module:hook("message/offline/handle", function(event) |
136 if event.stanza._notify then | 250 local node, user_push_services = get_push_settings(event.stanza, event.origin); |
137 event.stanza._notify = nil; | 251 return handle_notify_request(event.stanza, node, user_push_services); |
138 return; | |
139 end | |
140 return handle_notify_request(event.origin, event.stanza); | |
141 end, 1); | 252 end, 1); |
142 | 253 |
143 -- publish on unacked smacks message | 254 -- publish on unacked smacks message |
144 local function process_new_stanza(stanza, session) | 255 local function process_smacks_stanza(stanza, session) |
145 if getmetatable(stanza) ~= st.stanza_mt then | 256 if session.push_identifier then |
146 return stanza; -- Things we don't want to touch | 257 session.log("debug", "Invoking cloud handle_notify_request for smacks queued stanza..."); |
147 end | 258 local user_push_services = {[session.push_identifier] = session.push_settings}; |
148 if stanza.name == "message" and stanza.attr.xmlns == nil and | 259 local node = get_push_settings(stanza, session); |
149 ( stanza.attr.type == "chat" or ( stanza.attr.type or "normal" ) == "normal" ) and | 260 handle_notify_request(stanza, node, user_push_services); |
150 -- not already notified via cloud | |
151 not stanza._notify then | |
152 stanza._notify = true; | |
153 session.log("debug", "Invoking cloud handle_notify_request for new smacks hibernated stanza..."); | |
154 handle_notify_request(session, stanza) | |
155 end | 261 end |
156 return stanza; | 262 return stanza; |
157 end | 263 end |
158 | 264 |
159 -- smacks hibernation is started | 265 -- smacks hibernation is started |
160 local function hibernate_session(event) | 266 local function hibernate_session(event) |
161 local session = event.origin; | 267 local session = event.origin; |
162 local queue = event.queue; | 268 local queue = event.queue; |
163 -- process unacked stanzas | 269 -- process unacked stanzas |
164 for i=1,#queue do | 270 for i=1,#queue do |
165 process_new_stanza(queue[i], session); | 271 process_smacks_stanza(queue[i], session); |
166 end | 272 end |
167 -- process future unacked (hibernated) stanzas | 273 -- process future unacked (hibernated) stanzas |
168 filters.add_filter(session, "stanzas/out", process_new_stanza); | 274 filters.add_filter(session, "stanzas/out", process_smacks_stanza); |
169 end | 275 end |
170 | 276 |
171 -- smacks hibernation is ended | 277 -- smacks hibernation is ended |
172 local function restore_session(event) | 278 local function restore_session(event) |
173 local session = event.origin; | 279 local session = event.resumed; |
174 filters.remove_filter(session, "stanzas/out", process_new_stanza); | 280 if session then -- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one |
281 filters.remove_filter(session, "stanzas/out", process_smacks_stanza); | |
282 -- this means the counter of outstanding push messages can be reset as well | |
283 if session.push_settings then | |
284 session.push_settings.count = 0; | |
285 push_store:set_identifier(session.username, session.push_identifier, session.push_settings); | |
286 end | |
287 end | |
175 end | 288 end |
176 | 289 |
177 -- smacks ack is delayed | 290 -- smacks ack is delayed |
178 local function ack_delayed(event) | 291 local function ack_delayed(event) |
179 local session = event.origin; | 292 local session = event.origin; |
180 local queue = event.queue; | 293 local queue = event.queue; |
181 -- process unacked stanzas (process_new_stanza will only send push requests for new messages) | 294 -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas) |
182 for i=1,#queue do | 295 for i=1,#queue do |
183 process_new_stanza(queue[i], session); | 296 process_smacks_stanza(queue[i], session); |
297 end | |
298 end | |
299 | |
300 -- archive message added | |
301 local function archive_message_added(event) | |
302 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id } | |
303 -- only notify for new mam messages when at least one device is only | |
304 if not event.for_user or not host_sessions[event.for_user] then return; end | |
305 local stanza = event.stanza; | |
306 local user_session = host_sessions[event.for_user].sessions; | |
307 local to = stanza.attr.to; | |
308 to = to and jid.split(to) or event.origin.username; | |
309 | |
310 -- only notify if the stanza destination is the mam user we store it for | |
311 if event.for_user == to then | |
312 local user_push_services = push_store:get(to); | |
313 if not #user_push_services then return end | |
314 | |
315 -- only notify nodes with no active sessions (smacks is counted as active and handled separate) | |
316 local notify_push_sevices = {}; | |
317 for identifier, push_info in pairs(user_push_services) do | |
318 local identifier_found = nil; | |
319 for _, session in pairs(user_session) do | |
320 -- module:log("debug", "searching for '%s': identifier '%s' for session %s", tostring(identifier), tostring(session.push_identifier), tostring(session.full_jid)); | |
321 if session.push_identifier == identifier then | |
322 identifier_found = session; | |
323 break; | |
324 end | |
325 end | |
326 if identifier_found then | |
327 identifier_found.log("debug", "Not notifying '%s' of new MAM stanza (session still alive)", identifier); | |
328 else | |
329 notify_push_sevices[identifier] = push_info; | |
330 end | |
331 end | |
332 | |
333 return handle_notify_request(event.stanza, to, notify_push_sevices); | |
184 end | 334 end |
185 end | 335 end |
186 | 336 |
187 module:hook("smacks-hibernation-start", hibernate_session); | 337 module:hook("smacks-hibernation-start", hibernate_session); |
188 module:hook("smacks-hibernation-end", restore_session); | 338 module:hook("smacks-hibernation-end", restore_session); |
189 module:hook("smacks-ack-delayed", ack_delayed); | 339 module:hook("smacks-ack-delayed", ack_delayed); |
190 | 340 module:hook("archive-message-added", archive_message_added); |
191 | 341 |
192 module:hook("message/offline/broadcast", function(event) | 342 local function send_ping(event) |
193 local origin = event.origin; | 343 local user = event.user; |
194 local user_push_services = push_enabled:get(origin.username); | 344 local user_push_services = push_store:get(user); |
195 if not user_push_services then return end | 345 local push_services = event.push_services or user_push_services; |
196 | 346 return handle_notify_request(nil, user, push_services); |
197 for _, push_info in pairs(user_push_services) do | 347 end |
198 if push_info then | 348 -- can be used by other modules to ping one or more (or all) push endpoints |
199 push_info.count = 0; | 349 module:hook("cloud-notify-ping", send_ping); |
200 end | 350 |
201 end | 351 -- TODO: this has to be done on first connect not on offline broadcast, else the counter will be incorrect |
202 push_enabled:set(origin.username, user_push_services); | 352 -- TODO: it seems this is already done, so this could be safely removed, couldn't it? |
203 end, 1); | 353 -- module:hook("message/offline/broadcast", function(event) |
354 -- local origin = event.origin; | |
355 -- local user_push_services = push_store:get(origin.username); | |
356 -- if not #user_push_services then return end | |
357 -- | |
358 -- for _, push_info in pairs(user_push_services) do | |
359 -- if push_info then | |
360 -- push_info.count = 0; | |
361 -- end | |
362 -- end | |
363 -- push_store:set(origin.username, user_push_services); | |
364 -- end, 1); | |
365 | |
366 function module.unload() | |
367 module:unhook("account-disco-info", account_dico_info); | |
368 module:unhook("iq-set/self/"..xmlns_push..":enable", push_enable); | |
369 module:unhook("iq-set/self/"..xmlns_push..":disable", push_disable); | |
370 | |
371 module:unhook("smacks-hibernation-start", hibernate_session); | |
372 module:unhook("smacks-hibernation-end", restore_session); | |
373 module:unhook("smacks-ack-delayed", ack_delayed); | |
374 module:unhook("archive-message-added", archive_message_added); | |
375 module:unhook("cloud-notify-ping", send_ping); | |
376 | |
377 for push_identifier, _ in pairs(push_errors) do | |
378 module:hook("iq-error/bare/"..push_identifier, handle_push_error); | |
379 module:hook("iq-result/bare/"..push_identifier, handle_push_success); | |
380 end | |
381 end |