Comparison

mod_push2/mod_push2.lua @ 5682:4d1a3de56c3d

Initial work on Push 2.0
author Stephen Paul Weber <singpolyma@singpolyma.net>
date Tue, 19 Sep 2023 21:21:17 -0500
child 5683:bebb10fa5787
comparison
equal deleted inserted replaced
5681:7a4a6ded2bd6 5682:4d1a3de56c3d
1 local os_time = os.time;
2 local st = require"util.stanza";
3 local jid = require"util.jid";
4 local hashes = require"util.hashes";
5 local random = require"util.random";
6 local watchdog = require "util.watchdog";
7 local uuid = require "util.uuid";
8 local base64 = require "util.encodings".base64;
9 local ciphers = require "openssl.cipher";
10 local pkey = require "openssl.pkey";
11 local kdf = require "openssl.kdf";
12 local jwt = require "util.jwt";
13
14 local xmlns_push = "urn:xmpp:push2:0";
15
16 -- configuration
17 local contact_uri = module:get_option_string("contact_uri", "xmpp:" .. module.host)
18 local extended_hibernation_timeout = module:get_option_number("push_max_hibernation_timeout", 72*3600) -- use same timeout like ejabberd
19
20 local host_sessions = prosody.hosts[module.host].sessions
21 local push2_registrations = module:open_store("push2_registrations", "keyval")
22
23 if _VERSION:match("5%.1") or _VERSION:match("5%.2") then
24 module:log("warn", "This module may behave incorrectly on Lua before 5.3. It is recommended to upgrade to a newer Lua version.")
25 end
26
27 local function account_dico_info(event)
28 (event.reply or event.stanza):tag("feature", {var=xmlns_push}):up()
29 end
30 module:hook("account-disco-info", account_dico_info);
31
32 local function parse_match(matchel)
33 local match = { match = matchel.attr.profile }
34 local send = matchel:get_child("send", "urn:xmpp:push2:send:notify-only:0")
35 if send then
36 match.send = send.attr.xmlns
37 return match
38 end
39
40 send = matchel:get_child("send", "urn:xmpp:push2:send:sce+rfc8291+rfc8292:0")
41 if send then
42 match.send = send.attr.xmlns
43 match.ua_public = send:get_child_text("ua-public")
44 match.auth_secret = send:get_child_text("auth-secret")
45 match.jwt_alg = send:get_child_text("jwt-alg")
46 match.jwt_key = send:get_child_text("jwt-key")
47 match.jwt_claims = {}
48 for claim in send:childtags("jwt-claim") do
49 match.jwt_claims[claim.attr.name] = claim:get_text()
50 end
51 return match
52 end
53
54 return nil
55 end
56
57 local function push_enable(event)
58 local origin, stanza = event.origin, event.stanza;
59 local enable = stanza.tags[1];
60 origin.log("debug", "Attempting to enable push notifications")
61 -- MUST contain a jid of the push service being enabled
62 local service_jid = enable:get_child_text("service")
63 -- MUST contain a string to identify the client fo the push service
64 local client = enable:get_child_text("client")
65 if not service_jid then
66 origin.log("debug", "Push notification enable request missing service")
67 origin.send(st.error_reply(stanza, "modify", "bad-request", "Missing service"))
68 return true
69 end
70 if not client then
71 origin.log("debug", "Push notification enable request missing client")
72 origin.send(st.error_reply(stanza, "modify", "bad-request", "Missing client"))
73 return true
74 end
75 if service_jid == stanza.attr.from then
76 origin.log("debug", "Push notification enable request service JID identical to our own")
77 origin.send(st.error_reply(stanza, "modify", "bad-request", "JID must be different from ours"))
78 return true
79 end
80 local matches = {}
81 for matchel in enable:childtags("match") do
82 local match = parse_match(matchel)
83 if match then
84 matches[#matches + 1] = match
85 end
86 end
87 -- Tie registration to client, via client_id with sasl2 or else fallback to resource
88 local registration_id = origin.client_id or origin.resource
89 local push_registration = {
90 service = service_jid;
91 client = client;
92 timestamp = os_time();
93 matches = matches;
94 };
95 -- TODO: can we move to keyval+ on trunk?
96 local registrations = push2_registrations:get(origin.username) or {}
97 registrations[registration_id] = push_registration
98 if not push2_registrations:set(origin.username, registrations) then
99 origin.send(st.error_reply(stanza, "wait", "internal-server-error"));
100 else
101 origin.push_registration_id = registration_id
102 origin.push_registration = push_registration
103 origin.first_hibernated_push = nil
104 origin.log("info", "Push notifications enabled for %s (%s)", tostring(stanza.attr.from), tostring(service_jid))
105 origin.send(st.reply(stanza))
106 end
107 return true
108 end
109 module:hook("iq-set/self/"..xmlns_push..":enable", push_enable)
110
111 -- urgent stanzas should be delivered without delay
112 local function is_voip(stanza)
113 if stanza.name == "message" then
114 if stanza:get_child("propose", "urn:xmpp:jingle-message:0") then
115 return true, "jingle call"
116 end
117 end
118 end
119
120 local function has_body(stanza)
121 -- We can't check for body contents in encrypted messages, so let's treat them as important
122 -- Some clients don't even set a body or an empty body for encrypted messages
123
124 -- check omemo https://xmpp.org/extensions/inbox/omemo.html
125 if stanza:get_child("encrypted", "eu.siacs.conversations.axolotl") or stanza:get_child("encrypted", "urn:xmpp:omemo:0") then return true; end
126
127 -- check xep27 pgp https://xmpp.org/extensions/xep-0027.html
128 if stanza:get_child("x", "jabber:x:encrypted") then return true; end
129
130 -- check xep373 pgp (OX) https://xmpp.org/extensions/xep-0373.html
131 if stanza:get_child("openpgp", "urn:xmpp:openpgp:0") then return true; end
132
133 local body = stanza:get_child_text("body");
134
135 return body ~= nil and body ~= ""
136 end
137
138 -- is this push a high priority one
139 local function is_important(stanza)
140 local is_voip_stanza, urgent_reason = is_voip(stanza)
141 if is_voip_stanza then return true; end
142
143 local st_name = stanza and stanza.name or nil
144 if not st_name then return false; end -- nonzas are never important here
145 if st_name == "presence" then
146 return false; -- same for presences
147 elseif st_name == "message" then
148 -- unpack carbon copied message stanzas
149 local carbon = stanza:find("{urn:xmpp:carbons:2}/{urn:xmpp:forward:0}/{jabber:client}message")
150 local stanza_direction = carbon and stanza:child_with_name("sent") and "out" or "in"
151 if carbon then stanza = carbon; end
152 local st_type = stanza.attr.type
153
154 -- headline message are always not important
155 if st_type == "headline" then return false; end
156
157 -- carbon copied outgoing messages are not important
158 if carbon and stanza_direction == "out" then return false; end
159
160 -- groupchat subjects are not important here
161 if st_type == "groupchat" and stanza:get_child_text("subject") then
162 return false
163 end
164
165 -- empty bodies are not important
166 return has_body(stanza)
167 end
168 return false; -- this stanza wasn't one of the above cases --> it is not important, too
169 end
170
171 local function add_sce_rfc8291(match, stanza, push_notification_payload)
172 local max_data_size = 2847 -- https://github.com/web-push-libs/web-push-php/issues/108
173 local stanza_clone = st.clone(stanza)
174 stanza_clone.attr.xmlns = "jabber:client"
175 local envelope = st.stanza("envelope", { xmlns = "urn:xmpp:sce:1" })
176 :tag("content")
177 :tag("forwarded", { xmlns = "urn:xmpp:forward:0" })
178 :add_child(stanza_clone)
179 :up():up():up()
180 local envelope_bytes = tostring(envelope)
181 if string.len(envelope_bytes) > max_data_size then
182 -- If stanza is too big, remove extra elements
183 stanza_clone:maptags(function(el)
184 if el.attr.xmlns == nil or
185 el.attr.xmlns == "jabber:client" or
186 el.attr.xmlns == "jabber:x:oob" or
187 (el.attr.xmlns == "urn:xmpp:sid:0" and el.name == "stanza-id") or
188 el.attr.xmlns == "eu.siacs.conversations.axolotl" or
189 el.attr.xmlns == "urn:xmpp:omemo:0" or
190 el.attr.xmlns == "jabber:x:encrypted" or
191 el.attr.xmlns == "urn:xmpp:openpgp:0" or
192 el.attr.xmlns == "urn:xmpp:sce:1" or
193 el.attr.xmlns == "urn:xmpp:jingle-message:0" or
194 el.attr.xmlns == "jabber:x:conference"
195 then
196 return el
197 else
198 return nil
199 end
200 end)
201 envelope_bytes = tostring(envelope)
202 end
203 if string.len(envelope_bytes) > max_data_size then
204 -- If still too big, get aggressive
205 stanza_clone:maptags(function(el)
206 if el.name == "body" or
207 (el.attr.xmlns == "urn:xmpp:sid:0" and el.name == "stanza-id") or
208 el.attr.xmlns == "urn:xmpp:jingle-message:0" or
209 el.attr.xmlns == "jabber:x:conference"
210 then
211 return el
212 else
213 return nil
214 end
215 end)
216 envelope_bytes = tostring(envelope)
217 end
218 if string.len(envelope_bytes) < max_data_size/2 then
219 envelope:text_tag("rpad", base64.encode(random.bytes(math.min(150, max_data_size/3 - string.len(envelope_bytes)))))
220 envelope_bytes = tostring(envelope)
221 end
222
223 local p256dh_raw = base64.decode(match.ua_public .. "==")
224 local p256dh = pkey.new(p256dh_raw, "*", "public", "prime256v1")
225 local one_time_key = pkey.new({ type = "EC", curve = "prime256v1" })
226 local one_time_key_public = one_time_key:getParameters().pub_key:toBinary()
227 local info = "WebPush: info\0" .. p256dh_raw .. one_time_key_public
228 local auth_secret = base64.decode(match.auth_secret .. "==")
229 local salt = random.bytes(16)
230 local shared_secret = one_time_key:derive(p256dh)
231 local ikm = kdf.derive({
232 type = "HKDF",
233 outlen = 32,
234 salt = auth_secret,
235 key = shared_secret,
236 info = info,
237 md = "sha256"
238 })
239 local key = kdf.derive({
240 type = "HKDF",
241 outlen = 16,
242 salt = salt,
243 key = ikm,
244 info = "Content-Encoding: aes128gcm\0",
245 md = "sha256"
246 })
247 local nonce = kdf.derive({
248 type = "HKDF",
249 outlen = 12,
250 salt = salt,
251 key = ikm,
252 info = "Content-Encoding: nonce\0",
253 md = "sha256"
254 })
255 local header = salt .. "\0\0\16\0" .. string.char(string.len(one_time_key_public)) .. one_time_key_public
256 local encryptor = ciphers.new("AES-128-GCM"):encrypt(key, nonce)
257
258 push_notification_payload
259 :tag("encrypted", { xmlns = "urn:xmpp:sce:rfc8291:0" })
260 :text_tag("payload", base64.encode(header .. encryptor:final(envelope_bytes .. "\2") .. encryptor:getTag(16)))
261 :up()
262 end
263
264 local function add_rfc8292(match, stanza, push_notification_payload)
265 if not match.jwt_alg then return; end
266 local key = match.jwt_key
267 if match.jwt_alg ~= "HS256" then
268 -- keypairs are in PKCS#8 PEM format without header/footer
269 key = "-----BEGIN PRIVATE KEY-----\n"..key.."\n-----END PRIVATE KEY-----"
270 end
271
272 local signer = jwt.new_signer(match.jwt_alg, key)
273 local payload = {}
274 for k, v in pairs(match.jwt_claims or {}) do
275 payload[k] = v
276 end
277 payload.sub = contact_uri
278 push_notification_payload:text_tag("jwt", signer(payload))
279 end
280
281 local function handle_notify_request(stanza, node, user_push_services, log_push_decline)
282 local pushes = 0;
283 if not #user_push_services then return pushes end
284
285 local notify_push_services = {};
286 if is_important(stanza) then
287 notify_push_services = user_push_services
288 else
289 for identifier, push_info in pairs(user_push_services) do
290 for _, match in ipairs(push_info.matches) do
291 if match.match == "urn:xmpp:push2:match:important" then
292 identifier_found.log("debug", "Not pushing because not important")
293 else
294 notify_push_services[identifier] = push_info;
295 end
296 end
297 end
298 end
299
300 for push_registration_id, push_info in pairs(notify_push_services) do
301 local send_push = true; -- only send push to this node when not already done for this stanza or if no stanza is given at all
302 if stanza then
303 if not stanza._push_notify2 then stanza._push_notify2 = {}; end
304 if stanza._push_notify2[push_registration_id] then
305 if log_push_decline then
306 module:log("debug", "Already sent push notification for %s@%s to %s (%s)", node, module.host, push_info.jid, tostring(push_info.node));
307 end
308 send_push = false;
309 end
310 stanza._push_notify2[push_registration_id] = true;
311 end
312
313 if send_push then
314 local push_notification_payload = st.stanza("notification", { xmlns = xmlns_push })
315 push_notification_payload:text_tag("client", push_info.client)
316 push_notification_payload:text_tag("priority", is_voip(stanza) and "high" or (is_important(stanza) and "normal" or "low"))
317 if is_voip(stanza) then
318 push_notification_payload:tag("voip"):up()
319 end
320
321 local sends_added = {};
322 for _, match in ipairs(push_info.matches) do
323 local does_match = false;
324 if match.match == "urn:xmpp:push2:match:all" then
325 does_match = true
326 elseif match.match == "urn:xmpp:push2:match:important" then
327 does_match = is_important(stanza)
328 elseif match.match == "urn:xmpp:push2:match:archived" then
329 does_match = stanza:get_child("stana-id", "urn:xmpp:sid:0")
330 elseif match.match == "urn:xmpp:push2:match:archived-with-body" then
331 does_match = stanza:get_child("stana-id", "urn:xmpp:sid:0") and has_body(stanza)
332 end
333
334 if does_match and not sends_added[match.send] then
335 sends_added[match.send] = true
336 if match.send == "urn:xmpp:push2:send:notify-only" then
337 -- Nothing more to add
338 elseif match.send == "urn:xmpp:push2:send:sce+rfc8291+rfc8292:0" then
339 add_sce_rfc8291(match, stanza, push_notification_payload)
340 add_rfc8292(match, stanza, push_notification_payload)
341 else
342 module:log("debug", "Unkonwn send profile: " .. push_info.send)
343 end
344 end
345 end
346
347 local push_publish = st.message({ to = push_info.service, from = module.host, id = uuid.generate() })
348 :add_child(push_notification_payload):up()
349
350 -- TODO: watch for message error replies and count or something
351 module:send(push_publish)
352 pushes = pushes + 1
353 end
354 end
355
356 return pushes
357 end
358
359 -- small helper function to extract relevant push settings
360 local function get_push_settings(stanza, session)
361 local to = stanza.attr.to
362 local node = to and jid.split(to) or session.username
363 local user_push_services = push2_registrations:get(node)
364 return node, (user_push_services or {})
365 end
366
367 -- publish on bare groupchat
368 -- this picks up MUC messages when there are no devices connected
369 module:hook("message/bare/groupchat", function(event)
370 local node, user_push_services = get_push_settings(event.stanza, event.origin);
371 local notify_push_services = {};
372 for identifier, push_info in pairs(user_push_services) do
373 for _, match in ipairs(push_info.matches) do
374 if match.match == "urn:xmpp:push2:match:archived-with-body" or match.match == "urn:xmpp:push2:match:archived" then
375 identifier_found.log("debug", "Not pushing because we are not archiving this stanza")
376 else
377 notify_push_services[identifier] = push_info;
378 end
379 end
380 end
381
382 handle_notify_request(event.stanza, node, notify_push_services, true);
383 end, 1);
384
385 local function process_stanza_queue(queue, session, queue_type)
386 if not session.push_registration_id then return; end
387 for _, match in ipairs(session.push_settings.matches) do
388 if match.match == "urn:xmpp:push2:match:archived-with-body" or match.match == "urn:xmpp:push2:match:archived" then
389 module:log("debug", "Not pushing because we are not archiving this stanza: %s", session.push_registration_id)
390 return
391 end
392 end
393 local user_push_services = {[session.push_registration_id] = session.push_settings};
394 local notified = { unimportant = false; important = false }
395 for i=1, #queue do
396 local stanza = queue[i];
397 -- fast ignore of already pushed stanzas
398 if stanza and not (stanza._push_notify2 and stanza._push_notify2[session.push_registration_id]) then
399 local node = get_push_settings(stanza, session);
400 local stanza_type = "unimportant";
401 if is_important(stanza) then stanza_type = "important"; end
402 if not notified[stanza_type] then -- only notify if we didn't try to push for this stanza type already
403 if handle_notify_request(stanza, node, user_push_services, false) ~= 0 then
404 if session.hibernating and not session.first_hibernated_push then
405 -- if the message was important
406 -- then record the time of first push in the session for the smack module which will extend its hibernation
407 -- timeout based on the value of session.first_hibernated_push
408 if is_important(stanza) then
409 session.first_hibernated_push = os_time();
410 -- check for prosody 0.12 mod_smacks
411 if session.hibernating_watchdog and session.original_smacks_callback and session.original_smacks_timeout then
412 -- restore old smacks watchdog (--> the start of our original timeout will be delayed until first push)
413 session.hibernating_watchdog:cancel();
414 session.hibernating_watchdog = watchdog.new(session.original_smacks_timeout, session.original_smacks_callback);
415 end
416 end
417 end
418 notified[stanza_type] = true
419 end
420 end
421 end
422 if notified.unimportant and notified.important then break; end -- stop processing the queue if all push types are exhausted
423 end
424 end
425
426 -- publish on unacked smacks message (use timer to send out push for all stanzas submitted in a row only once)
427 local function process_stanza(session, stanza)
428 if session.push_registration_id then
429 session.log("debug", "adding new stanza to push_queue");
430 if not session.push_queue then session.push_queue = {}; end
431 local queue = session.push_queue;
432 queue[#queue+1] = st.clone(stanza);
433 if not session.awaiting_push_timer then -- timer not already running --> start new timer
434 session.awaiting_push_timer = module:add_timer(1.0, function ()
435 process_stanza_queue(session.push_queue, session, "push");
436 session.push_queue = {}; -- clean up queue after push
437 session.awaiting_push_timer = nil;
438 end);
439 end
440 end
441 return stanza;
442 end
443
444 local function process_smacks_stanza(event)
445 local session = event.origin;
446 local stanza = event.stanza;
447 if not session.push_registration_id then
448 session.log("debug", "NOT invoking handle_notify_request() for newly smacks queued stanza (session.push_registration_id is not set: %s)",
449 session.push_registration_id
450 );
451 else
452 process_stanza(session, stanza)
453 end
454 end
455
456 -- smacks hibernation is started
457 local function hibernate_session(event)
458 local session = event.origin;
459 local queue = event.queue;
460 session.first_hibernated_push = nil;
461 if session.push_registration_id and session.hibernating_watchdog then -- check for prosody 0.12 mod_smacks
462 -- save old watchdog callback and timeout
463 session.original_smacks_callback = session.hibernating_watchdog.callback;
464 session.original_smacks_timeout = session.hibernating_watchdog.timeout;
465 -- cancel old watchdog and create a new watchdog with extended timeout
466 session.hibernating_watchdog:cancel();
467 session.hibernating_watchdog = watchdog.new(extended_hibernation_timeout, function()
468 session.log("debug", "Push-extended smacks watchdog triggered");
469 if session.original_smacks_callback then
470 session.log("debug", "Calling original smacks watchdog handler");
471 session.original_smacks_callback();
472 end
473 end);
474 end
475 -- process unacked stanzas
476 process_stanza_queue(queue, session, "smacks");
477 end
478
479 -- smacks hibernation is ended
480 local function restore_session(event)
481 local session = event.resumed;
482 if session then -- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one
483 if session.awaiting_push_timer then
484 session.awaiting_push_timer:stop();
485 session.awaiting_push_timer = nil;
486 end
487 session.first_hibernated_push = nil;
488 -- the extended smacks watchdog will be canceled by the smacks module, no need to anything here
489 end
490 end
491
492 -- smacks ack is delayed
493 local function ack_delayed(event)
494 local session = event.origin;
495 local queue = event.queue;
496 local stanza = event.stanza;
497 if not session.push_registration_id then return; end
498 if stanza then process_stanza(session, stanza); return; end -- don't iterate through smacks queue if we know which stanza triggered this
499 for i=1, #queue do
500 local queued_stanza = queue[i];
501 -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas)
502 process_stanza(session, queued_stanza);
503 end
504 end
505
506 -- archive message added
507 local function archive_message_added(event)
508 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id }
509 if not event.for_user then return; end
510 -- Note that the stanza in the event is a clone not the same as other hooks, so dedupe doesn't work
511 -- This is a problem if you wan to to also hook offline message storage for example
512 local stanza = st.clone(event.stanza)
513 stanza:tag("stanza-id", { xmlns = "urn:xmpp:sid:0", by = event.for_user.."@"..module.host, id = event.id }):up()
514 local user_session = host_sessions[event.for_user] and host_sessions[event.for_user].sessions or {}
515 local to = stanza.attr.to
516 to = to and jid.split(to) or event.origin.username
517
518 -- only notify if the stanza destination is the mam user we store it for
519 if event.for_user == to then
520 local user_push_services = push2_registrations:get(to)
521
522 -- Urgent stanzas are time-sensitive (e.g. calls) and should
523 -- be pushed immediately to avoid getting stuck in the smacks
524 -- queue in case of dead connections, for example
525 local is_voip_stanza, urgent_reason = is_voip(stanza);
526
527 local notify_push_services;
528 if is_voip_stanza then
529 module:log("debug", "Urgent push for %s (%s)", to, urgent_reason);
530 notify_push_services = user_push_services;
531 else
532 -- only notify nodes with no active sessions (smacks is counted as active and handled separate)
533 notify_push_services = {};
534 for identifier, push_info in pairs(user_push_services) do
535 local identifier_found = nil;
536 for _, session in pairs(user_session) do
537 if session.push_registration_id == identifier then
538 identifier_found = session;
539 break;
540 end
541 end
542 if identifier_found then
543 identifier_found.log("debug", "Not pushing '%s' of new MAM stanza (session still alive)", identifier)
544 elseif not has_body(stanza) then
545 for _, match in ipairs(push_info.matches) do
546 if match.match == "urn:xmpp:push2:match:archived-with-body" then
547 identifier_found.log("debug", "Not pushing '%s' of new MAM stanza (no body)", identifier)
548 else
549 notify_push_services[identifier] = push_info
550 end
551 end
552 else
553 notify_push_services[identifier] = push_info
554 end
555 end
556 end
557
558 handle_notify_request(stanza, to, notify_push_services, true);
559 end
560 end
561
562 module:hook("smacks-hibernation-start", hibernate_session);
563 module:hook("smacks-hibernation-end", restore_session);
564 module:hook("smacks-ack-delayed", ack_delayed);
565 module:hook("smacks-hibernation-stanza-queued", process_smacks_stanza);
566 module:hook("archive-message-added", archive_message_added);
567
568 module:log("info", "Module loaded");
569 function module.unload()
570 module:log("info", "Unloading module");
571 -- cleanup some settings, reloading this module can cause process_smacks_stanza() to stop working otherwise
572 for user, _ in pairs(host_sessions) do
573 for _, session in pairs(host_sessions[user].sessions) do
574 if session.awaiting_push_timer then session.awaiting_push_timer:stop(); end
575 session.awaiting_push_timer = nil;
576 session.push_queue = nil;
577 session.first_hibernated_push = nil;
578 -- check for prosody 0.12 mod_smacks
579 if session.hibernating_watchdog and session.original_smacks_callback and session.original_smacks_timeout then
580 -- restore old smacks watchdog
581 session.hibernating_watchdog:cancel();
582 session.hibernating_watchdog = watchdog.new(session.original_smacks_timeout, session.original_smacks_callback);
583 end
584 end
585 end
586 module:log("info", "Module unloaded");
587 end