Comparison

mod_cloud_notify/mod_cloud_notify.lua @ 2609:6ab46ff685d0

mod_cloud_notify: Respect Daniel's business rules and remove endpoints on error Daniel's business rules can be found here: https://mail.jabber.org/pipermail/standards/2016-February/030925.html All implementation changes are documented in depth in the file business_rules.markdown
author tmolitor <thilo@eightysoft.de>
date Sat, 11 Mar 2017 01:42:45 +0100
parent 2395:2e641ab995b3
child 2625:8c6562f16496
comparison
equal deleted inserted replaced
2608:362ca94192ee 2609:6ab46ff685d0
1 -- XEP-0357: Push (aka: My mobile OS vendor won't let me have persistent TCP connections) 1 -- XEP-0357: Push (aka: My mobile OS vendor won't let me have persistent TCP connections)
2 -- Copyright (C) 2015-2016 Kim Alvefur 2 -- Copyright (C) 2015-2016 Kim Alvefur
3 -- Copyright (C) 2017 Thilo Molitor
3 -- 4 --
4 -- This file is MIT/X11 licensed. 5 -- This file is MIT/X11 licensed.
5 6
6 local st = require"util.stanza"; 7 local st = require"util.stanza";
7 local jid = require"util.jid"; 8 local jid = require"util.jid";
11 local xmlns_push = "urn:xmpp:push:0"; 12 local xmlns_push = "urn:xmpp:push:0";
12 13
13 -- configuration 14 -- configuration
14 local include_body = module:get_option_boolean("push_notification_with_body", false); 15 local include_body = module:get_option_boolean("push_notification_with_body", false);
15 local include_sender = module:get_option_boolean("push_notification_with_sender", false); 16 local include_sender = module:get_option_boolean("push_notification_with_sender", false);
16 17 local max_push_errors = module:get_option_number("push_max_errors", 50);
17 -- For keeping state across reloads 18
18 local push_enabled = module:open_store(); 19 local host_sessions = prosody.hosts[module.host].sessions;
19 -- TODO map store would be better here 20 local push_errors = {};
21
22 -- For keeping state across reloads while caching reads
23 local push_store = (function()
24 local store = module:open_store();
25 local push_services = {};
26 local api = {};
27 function api:get(user)
28 if not push_services[user] then
29 local err;
30 push_services[user], err = store:get(user);
31 if not push_services[user] and err then
32 module:log("warn", "Error reading push notification storage for user '%s': %s", user, tostring(err));
33 push_services[user] = {};
34 return push_services[user], false;
35 end
36 end
37 if not push_services[user] then push_services[user] = {} end
38 return push_services[user], true;
39 end
40 function api:set(user, data)
41 push_services[user] = data;
42 local ok, err = store:set(user, push_services[user]);
43 if not ok then
44 module:log("error", "Error writing push notification storage for user '%s': %s", user, tostring(err));
45 return false;
46 end
47 return true;
48 end
49 function api:set_identifier(user, push_identifier, data)
50 local services = self:get(user);
51 services[push_identifier] = data;
52 return self:set(user, services);
53 end
54 return api;
55 end)();
56
57 local function handle_push_error(event)
58 local stanza = event.stanza;
59 local error_type, condition = stanza:get_error();
60 local push_identifier = stanza.attr.id;
61 local node = jid.split(stanza.attr.to);
62 local from = stanza.attr.from;
63 local user_push_services = push_store:get(node);
64
65 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and error_type ~= "wait" then
66 push_errors[push_identifier] = push_errors[push_identifier] + 1;
67 module:log("info", "Got error of type '%s' (%s) for identifier '%s':"
68 .."error count for this identifier is now at %s", error_type, condition, push_identifier,
69 tostring(push_errors[push_identifier]));
70 if push_errors[push_identifier] >= max_push_errors then
71 module:log("warn", "Disabling push notifications for identifier '%s'", push_identifier);
72 -- remove push settings from sessions
73 for _, session in pairs(host_sessions[node].sessions) do
74 if session.push_identifier == push_identifier then
75 session.push_identifier = nil;
76 session.push_settings = nil;
77 end
78 end
79 -- save changed global config
80 push_store:set_identifier(node, push_identifier, nil);
81 push_errors[push_identifier] = nil;
82 -- unhook iq handlers for this identifier
83 module:unhook("iq-error/bare/"..push_identifier, handle_push_error);
84 module:unhook("iq-result/bare/"..push_identifier, handle_push_success);
85 end
86 end
87 return true;
88 end
89
90 local function handle_push_success(event)
91 local stanza = event.stanza;
92 local push_identifier = stanza.attr.id;
93 local node = jid.split(stanza.attr.to);
94 local from = stanza.attr.from;
95 local user_push_services = push_store:get(node);
96
97 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] then
98 push_errors[push_identifier] = 0;
99 module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s", push_identifier, tostring(push_errors[push_identifier]));
100 end
101 return true;
102 end
20 103
21 -- http://xmpp.org/extensions/xep-0357.html#disco 104 -- http://xmpp.org/extensions/xep-0357.html#disco
22 module:hook("account-disco-info", function(event) 105 local function account_dico_info(event)
23 (event.reply or event.stanza):tag("feature", {var=xmlns_push}):up(); 106 (event.reply or event.stanza):tag("feature", {var=xmlns_push}):up();
24 end); 107 end
108 module:hook("account-disco-info", account_dico_info);
25 109
26 -- http://xmpp.org/extensions/xep-0357.html#enabling 110 -- http://xmpp.org/extensions/xep-0357.html#enabling
27 module:hook("iq-set/self/"..xmlns_push..":enable", function (event) 111 local function push_enable(event)
28 local origin, stanza = event.origin, event.stanza; 112 local origin, stanza = event.origin, event.stanza;
29 local enable = stanza.tags[1]; 113 local enable = stanza.tags[1];
30 origin.log("debug", "Attempting to enable push notifications"); 114 origin.log("debug", "Attempting to enable push notifications");
31 -- MUST contain a 'jid' attribute of the XMPP Push Service being enabled 115 -- MUST contain a 'jid' attribute of the XMPP Push Service being enabled
32 local push_jid = enable.attr.jid; 116 local push_jid = enable.attr.jid;
40 local publish_options = enable:get_child("x", "jabber:x:data"); 124 local publish_options = enable:get_child("x", "jabber:x:data");
41 if not publish_options then 125 if not publish_options then
42 -- Could be intentional 126 -- Could be intentional
43 origin.log("debug", "No publish options in request"); 127 origin.log("debug", "No publish options in request");
44 end 128 end
45 local user_push_services, rerr = push_enabled:get(origin.username); 129 local push_identifier = push_jid .. "<" .. (push_node or "");
46 if not user_push_services then 130 local push_service = {
47 if rerr then
48 module:log("warn", "Error reading push notification storage: %s", rerr);
49 origin.send(st.error_reply(stanza, "wait", "internal-server-error"));
50 return true;
51 end
52 user_push_services = {};
53 end
54 user_push_services[push_jid .. "<" .. (push_node or "")] = {
55 jid = push_jid; 131 jid = push_jid;
56 node = push_node; 132 node = push_node;
57 count = 0; 133 count = 0;
58 options = publish_options and st.preserialize(publish_options); 134 options = publish_options and st.preserialize(publish_options);
59 }; 135 };
60 local ok, err = push_enabled:set(origin.username, user_push_services); 136 local ok = push_store:set_identifier(origin.username, push_identifier, push_service);
61 if not ok then 137 if not ok then
62 origin.send(st.error_reply(stanza, "wait", "internal-server-error")); 138 origin.send(st.error_reply(stanza, "wait", "internal-server-error"));
63 else 139 else
64 origin.log("info", "Push notifications enabled"); 140 origin.push_identifier = push_identifier;
141 origin.push_settings = push_service;
142 origin.log("info", "Push notifications enabled (%s)", tostring(origin.push_identifier));
65 origin.send(st.reply(stanza)); 143 origin.send(st.reply(stanza));
66 end 144 end
67 return true; 145 return true;
68 end); 146 end
147 module:hook("iq-set/self/"..xmlns_push..":enable", push_enable);
69 148
70 -- http://xmpp.org/extensions/xep-0357.html#disabling 149 -- http://xmpp.org/extensions/xep-0357.html#disabling
71 module:hook("iq-set/self/"..xmlns_push..":disable", function (event) 150 local function push_disable(event)
72 local origin, stanza = event.origin, event.stanza; 151 local origin, stanza = event.origin, event.stanza;
73 local push_jid = stanza.tags[1].attr.jid; -- MUST include a 'jid' attribute 152 local push_jid = stanza.tags[1].attr.jid; -- MUST include a 'jid' attribute
74 local push_node = stanza.tags[1].attr.node; -- A 'node' attribute MAY be included 153 local push_node = stanza.tags[1].attr.node; -- A 'node' attribute MAY be included
75 if not push_jid then 154 if not push_jid then
76 origin.send(st.error_reply(stanza, "modify", "bad-request", "Missing jid")); 155 origin.send(st.error_reply(stanza, "modify", "bad-request", "Missing jid"));
77 return true; 156 return true;
78 end 157 end
79 local user_push_services = push_enabled:get(origin.username); 158 local user_push_services = push_store:get(origin.username);
80 for key, push_info in pairs(user_push_services) do 159 for key, push_info in pairs(user_push_services) do
81 if push_info.jid == push_jid and (not push_node or push_info.node == push_node) then 160 if push_info.jid == push_jid and (not push_node or push_info.node == push_node) then
161 origin.log("info", "Push notifications disabled (%s)", tostring(key));
162 if origin.push_identifier == key then
163 origin.push_identifier = nil;
164 origin.push_settings = nil;
165 end
82 user_push_services[key] = nil; 166 user_push_services[key] = nil;
83 end 167 push_errors[key] = nil;
84 end 168 module:unhook("iq-error/bare/"..key, handle_push_error);
85 origin.send(st.reply(stanza)); 169 module:unhook("iq-result/bare/"..key, handle_push_success);
170 end
171 end
172 local ok = push_store:set(origin.username, user_push_services);
173 if not ok then
174 origin.send(st.error_reply(stanza, "wait", "internal-server-error"));
175 else
176 origin.send(st.reply(stanza));
177 end
86 return true; 178 return true;
87 end); 179 end
180 module:hook("iq-set/self/"..xmlns_push..":disable", push_disable);
88 181
89 local push_form = dataform { 182 local push_form = dataform {
90 { name = "FORM_TYPE"; type = "hidden"; value = "urn:xmpp:push:summary"; }; 183 { name = "FORM_TYPE"; type = "hidden"; value = "urn:xmpp:push:summary"; };
91 { name = "message-count"; type = "text-single"; }; 184 { name = "message-count"; type = "text-single"; };
92 { name = "pending-subscription-count"; type = "text-single"; }; 185 { name = "pending-subscription-count"; type = "text-single"; };
93 { name = "last-message-sender"; type = "jid-single"; }; 186 { name = "last-message-sender"; type = "jid-single"; };
94 { name = "last-message-body"; type = "text-single"; }; 187 { name = "last-message-body"; type = "text-single"; };
95 }; 188 };
96 189
97 -- http://xmpp.org/extensions/xep-0357.html#publishing 190 -- http://xmpp.org/extensions/xep-0357.html#publishing
98 local function handle_notify_request(origin, stanza) 191 local function handle_notify_request(stanza, node, user_push_services)
99 local to = stanza.attr.to; 192 if not user_push_services or not #user_push_services then return end
100 local node = to and jid.split(to) or origin.username; 193
101 local user_push_services = push_enabled:get(node); 194 if stanza and stanza._notify then
102 if not user_push_services then return end 195 module:log("debug", "Already sent push notification to %s@%s for this stanza, not doing it again", node, module.host);
103 196 return;
104 for _, push_info in pairs(user_push_services) do 197 end
198 if stanza then
199 stanza._notify = true;
200 end
201
202 for push_identifier, push_info in pairs(user_push_services) do
203 -- increment count and save it
105 push_info.count = push_info.count + 1; 204 push_info.count = push_info.count + 1;
106 local push_jid, push_node = push_info.jid, push_info.node; 205 push_store:set_identifier(node, push_identifier, push_info);
107 local push_publish = st.iq({ to = push_jid, from = node .. "@" .. module.host, type = "set", id = "push" }) 206 -- construct push stanza
207 local push_publish = st.iq({ to = push_info.jid, from = node .. "@" .. module.host, type = "set", id = push_identifier })
108 :tag("pubsub", { xmlns = "http://jabber.org/protocol/pubsub" }) 208 :tag("pubsub", { xmlns = "http://jabber.org/protocol/pubsub" })
109 :tag("publish", { node = push_node }) 209 :tag("publish", { node = push_info.node })
110 :tag("item") 210 :tag("item")
111 :tag("notification", { xmlns = xmlns_push }); 211 :tag("notification", { xmlns = xmlns_push });
112 local form_data = { 212 local form_data = {
113 ["message-count"] = tostring(push_info.count); 213 ["message-count"] = tostring(push_info.count);
114 }; 214 };
115 if include_sender then 215 if stanza and include_sender then
116 form_data["last-message-sender"] = stanza.attr.from; 216 form_data["last-message-sender"] = stanza.attr.from;
117 end 217 end
118 if include_body then 218 if stanza and include_body then
119 form_data["last-message-body"] = stanza:get_child_text("body"); 219 form_data["last-message-body"] = stanza:get_child_text("body");
120 end 220 end
121 push_publish:add_child(push_form:form(form_data)); 221 push_publish:add_child(push_form:form(form_data));
122 push_publish:up(); -- / notification 222 push_publish:up(); -- / notification
123 push_publish:up(); -- / publish 223 push_publish:up(); -- / publish
124 push_publish:up(); -- / pubsub 224 push_publish:up(); -- / pubsub
125 if push_info.options then 225 if push_info.options then
126 push_publish:tag("publish-options"):add_child(st.deserialize(push_info.options)); 226 push_publish:tag("publish-options"):add_child(st.deserialize(push_info.options));
127 end 227 end
128 module:log("debug", "Sending push notification for %s@%s to %s", node, module.host, push_jid); 228 -- send out push
229 module:log("debug", "Sending push notification for %s@%s to %s (%s)", node, module.host, push_info.jid, tostring(push_info.node));
230 -- handle push errors for this node
231 if push_errors[push_identifier] == nil then
232 push_errors[push_identifier] = 0;
233 module:hook("iq-error/bare/"..push_identifier, handle_push_error);
234 module:hook("iq-result/bare/"..push_identifier, handle_push_success);
235 end
129 module:send(push_publish); 236 module:send(push_publish);
130 end 237 end
131 push_enabled:set(node, user_push_services); 238 end
239
240 -- small helper function to extract relevant push settings
241 local function get_push_settings(stanza, session)
242 local to = stanza.attr.to;
243 local node = to and jid.split(to) or session.username;
244 local user_push_services = push_store:get(node);
245 return node, user_push_services;
132 end 246 end
133 247
134 -- publish on offline message 248 -- publish on offline message
135 module:hook("message/offline/handle", function(event) 249 module:hook("message/offline/handle", function(event)
136 if event.stanza._notify then 250 local node, user_push_services = get_push_settings(event.stanza, event.origin);
137 event.stanza._notify = nil; 251 return handle_notify_request(event.stanza, node, user_push_services);
138 return;
139 end
140 return handle_notify_request(event.origin, event.stanza);
141 end, 1); 252 end, 1);
142 253
143 -- publish on unacked smacks message 254 -- publish on unacked smacks message
144 local function process_new_stanza(stanza, session) 255 local function process_smacks_stanza(stanza, session)
145 if getmetatable(stanza) ~= st.stanza_mt then 256 if session.push_identifier then
146 return stanza; -- Things we don't want to touch 257 session.log("debug", "Invoking cloud handle_notify_request for smacks queued stanza...");
147 end 258 local user_push_services = {[session.push_identifier] = session.push_settings};
148 if stanza.name == "message" and stanza.attr.xmlns == nil and 259 local node = get_push_settings(stanza, session);
149 ( stanza.attr.type == "chat" or ( stanza.attr.type or "normal" ) == "normal" ) and 260 handle_notify_request(stanza, node, user_push_services);
150 -- not already notified via cloud
151 not stanza._notify then
152 stanza._notify = true;
153 session.log("debug", "Invoking cloud handle_notify_request for new smacks hibernated stanza...");
154 handle_notify_request(session, stanza)
155 end 261 end
156 return stanza; 262 return stanza;
157 end 263 end
158 264
159 -- smacks hibernation is started 265 -- smacks hibernation is started
160 local function hibernate_session(event) 266 local function hibernate_session(event)
161 local session = event.origin; 267 local session = event.origin;
162 local queue = event.queue; 268 local queue = event.queue;
163 -- process unacked stanzas 269 -- process unacked stanzas
164 for i=1,#queue do 270 for i=1,#queue do
165 process_new_stanza(queue[i], session); 271 process_smacks_stanza(queue[i], session);
166 end 272 end
167 -- process future unacked (hibernated) stanzas 273 -- process future unacked (hibernated) stanzas
168 filters.add_filter(session, "stanzas/out", process_new_stanza); 274 filters.add_filter(session, "stanzas/out", process_smacks_stanza);
169 end 275 end
170 276
171 -- smacks hibernation is ended 277 -- smacks hibernation is ended
172 local function restore_session(event) 278 local function restore_session(event)
173 local session = event.origin; 279 local session = event.resumed;
174 filters.remove_filter(session, "stanzas/out", process_new_stanza); 280 if session then -- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one
281 filters.remove_filter(session, "stanzas/out", process_smacks_stanza);
282 -- this means the counter of outstanding push messages can be reset as well
283 if session.push_settings then
284 session.push_settings.count = 0;
285 push_store:set_identifier(session.username, session.push_identifier, session.push_settings);
286 end
287 end
175 end 288 end
176 289
177 -- smacks ack is delayed 290 -- smacks ack is delayed
178 local function ack_delayed(event) 291 local function ack_delayed(event)
179 local session = event.origin; 292 local session = event.origin;
180 local queue = event.queue; 293 local queue = event.queue;
181 -- process unacked stanzas (process_new_stanza will only send push requests for new messages) 294 -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas)
182 for i=1,#queue do 295 for i=1,#queue do
183 process_new_stanza(queue[i], session); 296 process_smacks_stanza(queue[i], session);
297 end
298 end
299
300 -- archive message added
301 local function archive_message_added(event)
302 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id }
303 -- only notify for new mam messages when at least one device is only
304 if not event.for_user or not host_sessions[event.for_user] then return; end
305 local stanza = event.stanza;
306 local user_session = host_sessions[event.for_user].sessions;
307 local to = stanza.attr.to;
308 to = to and jid.split(to) or event.origin.username;
309
310 -- only notify if the stanza destination is the mam user we store it for
311 if event.for_user == to then
312 local user_push_services = push_store:get(to);
313 if not #user_push_services then return end
314
315 -- only notify nodes with no active sessions (smacks is counted as active and handled separate)
316 local notify_push_sevices = {};
317 for identifier, push_info in pairs(user_push_services) do
318 local identifier_found = nil;
319 for _, session in pairs(user_session) do
320 -- module:log("debug", "searching for '%s': identifier '%s' for session %s", tostring(identifier), tostring(session.push_identifier), tostring(session.full_jid));
321 if session.push_identifier == identifier then
322 identifier_found = session;
323 break;
324 end
325 end
326 if identifier_found then
327 identifier_found.log("debug", "Not notifying '%s' of new MAM stanza (session still alive)", identifier);
328 else
329 notify_push_sevices[identifier] = push_info;
330 end
331 end
332
333 return handle_notify_request(event.stanza, to, notify_push_sevices);
184 end 334 end
185 end 335 end
186 336
187 module:hook("smacks-hibernation-start", hibernate_session); 337 module:hook("smacks-hibernation-start", hibernate_session);
188 module:hook("smacks-hibernation-end", restore_session); 338 module:hook("smacks-hibernation-end", restore_session);
189 module:hook("smacks-ack-delayed", ack_delayed); 339 module:hook("smacks-ack-delayed", ack_delayed);
190 340 module:hook("archive-message-added", archive_message_added);
191 341
192 module:hook("message/offline/broadcast", function(event) 342 local function send_ping(event)
193 local origin = event.origin; 343 local user = event.user;
194 local user_push_services = push_enabled:get(origin.username); 344 local user_push_services = push_store:get(user);
195 if not user_push_services then return end 345 local push_services = event.push_services or user_push_services;
196 346 return handle_notify_request(nil, user, push_services);
197 for _, push_info in pairs(user_push_services) do 347 end
198 if push_info then 348 -- can be used by other modules to ping one or more (or all) push endpoints
199 push_info.count = 0; 349 module:hook("cloud-notify-ping", send_ping);
200 end 350
201 end 351 -- TODO: this has to be done on first connect not on offline broadcast, else the counter will be incorrect
202 push_enabled:set(origin.username, user_push_services); 352 -- TODO: it seems this is already done, so this could be safely removed, couldn't it?
203 end, 1); 353 -- module:hook("message/offline/broadcast", function(event)
354 -- local origin = event.origin;
355 -- local user_push_services = push_store:get(origin.username);
356 -- if not #user_push_services then return end
357 --
358 -- for _, push_info in pairs(user_push_services) do
359 -- if push_info then
360 -- push_info.count = 0;
361 -- end
362 -- end
363 -- push_store:set(origin.username, user_push_services);
364 -- end, 1);
365
366 function module.unload()
367 module:unhook("account-disco-info", account_dico_info);
368 module:unhook("iq-set/self/"..xmlns_push..":enable", push_enable);
369 module:unhook("iq-set/self/"..xmlns_push..":disable", push_disable);
370
371 module:unhook("smacks-hibernation-start", hibernate_session);
372 module:unhook("smacks-hibernation-end", restore_session);
373 module:unhook("smacks-ack-delayed", ack_delayed);
374 module:unhook("archive-message-added", archive_message_added);
375 module:unhook("cloud-notify-ping", send_ping);
376
377 for push_identifier, _ in pairs(push_errors) do
378 module:hook("iq-error/bare/"..push_identifier, handle_push_error);
379 module:hook("iq-result/bare/"..push_identifier, handle_push_success);
380 end
381 end