
mod_push2/mod_push2.lua @ 5682:4d1a3de56c3d

Initial work on Push 2.0
author Stephen Paul Weber <singpolyma@singpolyma.net>
date Tue, 19 Sep 2023 21:21:17 -0500
child 5683:bebb10fa5787
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_push2/mod_push2.lua	Tue Sep 19 21:21:17 2023 -0500
@@ -0,0 +1,587 @@
+local os_time = os.time;
+local st = require"util.stanza";
+local jid = require"util.jid";
+local hashes = require"util.hashes";
+local random = require"util.random";
+local watchdog = require "util.watchdog";
+local uuid = require "util.uuid";
+local base64 = require "util.encodings".base64;
+local ciphers = require "openssl.cipher";
+local pkey = require "openssl.pkey";
+local kdf = require "openssl.kdf";
+local jwt = require "util.jwt";
+local xmlns_push = "urn:xmpp:push2:0";
+-- configuration
+local contact_uri = module:get_option_string("contact_uri", "xmpp:" .. module.host)
+local extended_hibernation_timeout = module:get_option_number("push_max_hibernation_timeout", 72*3600)  -- use same timeout like ejabberd
+local host_sessions = prosody.hosts[module.host].sessions
+local push2_registrations = module:open_store("push2_registrations", "keyval")
+if _VERSION:match("5%.1") or _VERSION:match("5%.2") then
+	module:log("warn", "This module may behave incorrectly on Lua before 5.3. It is recommended to upgrade to a newer Lua version.")
+local function account_dico_info(event)
+	(event.reply or event.stanza):tag("feature", {var=xmlns_push}):up()
+module:hook("account-disco-info", account_dico_info);
+local function parse_match(matchel)
+		local match = { match = matchel.attr.profile }
+		local send = matchel:get_child("send", "urn:xmpp:push2:send:notify-only:0")
+		if send then
+			match.send = send.attr.xmlns
+			return match
+		end
+		send = matchel:get_child("send", "urn:xmpp:push2:send:sce+rfc8291+rfc8292:0")
+		if send then
+			match.send = send.attr.xmlns
+			match.ua_public = send:get_child_text("ua-public")
+			match.auth_secret = send:get_child_text("auth-secret")
+			match.jwt_alg = send:get_child_text("jwt-alg")
+			match.jwt_key = send:get_child_text("jwt-key")
+			match.jwt_claims = {}
+			for claim in send:childtags("jwt-claim") do
+				match.jwt_claims[claim.attr.name] = claim:get_text()
+			end
+			return match
+		end
+		return nil
+local function push_enable(event)
+	local origin, stanza = event.origin, event.stanza;
+	local enable = stanza.tags[1];
+	origin.log("debug", "Attempting to enable push notifications")
+	-- MUST contain a jid of the push service being enabled
+	local service_jid = enable:get_child_text("service")
+	-- MUST contain a string to identify the client fo the push service
+	local client = enable:get_child_text("client")
+	if not service_jid then
+		origin.log("debug", "Push notification enable request missing service")
+		origin.send(st.error_reply(stanza, "modify", "bad-request", "Missing service"))
+		return true
+	end
+	if not client then
+		origin.log("debug", "Push notification enable request missing client")
+		origin.send(st.error_reply(stanza, "modify", "bad-request", "Missing client"))
+		return true
+	end
+	if service_jid == stanza.attr.from then
+		origin.log("debug", "Push notification enable request service JID identical to our own")
+		origin.send(st.error_reply(stanza, "modify", "bad-request", "JID must be different from ours"))
+		return true
+	end
+	local matches = {}
+	for matchel in enable:childtags("match") do
+		local match = parse_match(matchel)
+		if match then
+			matches[#matches + 1] = match
+		end
+	end
+	-- Tie registration to client, via client_id with sasl2 or else fallback to resource
+	local registration_id = origin.client_id or origin.resource
+	local push_registration = {
+		service = service_jid;
+		client = client;
+		timestamp = os_time();
+		matches = matches;
+	};
+	-- TODO: can we move to keyval+ on trunk?
+	local registrations = push2_registrations:get(origin.username) or {}
+	registrations[registration_id] = push_registration
+	if not push2_registrations:set(origin.username, registrations) then
+		origin.send(st.error_reply(stanza, "wait", "internal-server-error"));
+	else
+		origin.push_registration_id = registration_id
+		origin.push_registration = push_registration
+		origin.first_hibernated_push = nil
+		origin.log("info", "Push notifications enabled for %s (%s)", tostring(stanza.attr.from), tostring(service_jid))
+		origin.send(st.reply(stanza))
+	end
+	return true
+module:hook("iq-set/self/"..xmlns_push..":enable", push_enable)
+-- urgent stanzas should be delivered without delay
+local function is_voip(stanza)
+	if stanza.name == "message" then
+		if stanza:get_child("propose", "urn:xmpp:jingle-message:0") then
+			return true, "jingle call"
+		end
+	end
+local function has_body(stanza)
+	-- We can't check for body contents in encrypted messages, so let's treat them as important
+	-- Some clients don't even set a body or an empty body for encrypted messages
+	-- check omemo https://xmpp.org/extensions/inbox/omemo.html
+	if stanza:get_child("encrypted", "eu.siacs.conversations.axolotl") or stanza:get_child("encrypted", "urn:xmpp:omemo:0") then return true; end
+	-- check xep27 pgp https://xmpp.org/extensions/xep-0027.html
+	if stanza:get_child("x", "jabber:x:encrypted") then return true; end
+	-- check xep373 pgp (OX) https://xmpp.org/extensions/xep-0373.html
+	if stanza:get_child("openpgp", "urn:xmpp:openpgp:0") then return true; end
+	local body = stanza:get_child_text("body");
+	return body ~= nil and body ~= ""
+-- is this push a high priority one
+local function is_important(stanza)
+	local is_voip_stanza, urgent_reason = is_voip(stanza)
+	if is_voip_stanza then return true; end
+	local st_name = stanza and stanza.name or nil
+	if not st_name then return false; end -- nonzas are never important here
+	if st_name == "presence" then
+		return false; -- same for presences
+	elseif st_name == "message" then
+		-- unpack carbon copied message stanzas
+		local carbon = stanza:find("{urn:xmpp:carbons:2}/{urn:xmpp:forward:0}/{jabber:client}message")
+		local stanza_direction = carbon and stanza:child_with_name("sent") and "out" or "in"
+		if carbon then stanza = carbon; end
+		local st_type = stanza.attr.type
+		-- headline message are always not important
+		if st_type == "headline" then return false; end
+		-- carbon copied outgoing messages are not important
+		if carbon and stanza_direction == "out" then return false; end
+		-- groupchat subjects are not important here
+		if st_type == "groupchat" and stanza:get_child_text("subject") then
+			return false
+		end
+		-- empty bodies are not important
+		return has_body(stanza)
+	end
+	return false;		-- this stanza wasn't one of the above cases --> it is not important, too
+local function add_sce_rfc8291(match, stanza, push_notification_payload)
+	local max_data_size = 2847 -- https://github.com/web-push-libs/web-push-php/issues/108
+	local stanza_clone = st.clone(stanza)
+	stanza_clone.attr.xmlns = "jabber:client"
+	local envelope = st.stanza("envelope", { xmlns = "urn:xmpp:sce:1" })
+		:tag("content")
+		:tag("forwarded", { xmlns = "urn:xmpp:forward:0" })
+		:add_child(stanza_clone)
+		:up():up():up()
+	local envelope_bytes = tostring(envelope)
+	if string.len(envelope_bytes) > max_data_size then
+		-- If stanza is too big, remove extra elements
+		stanza_clone:maptags(function(el)
+			if el.attr.xmlns == nil or
+				el.attr.xmlns == "jabber:client" or
+				el.attr.xmlns == "jabber:x:oob" or
+				(el.attr.xmlns == "urn:xmpp:sid:0" and el.name == "stanza-id") or
+				el.attr.xmlns == "eu.siacs.conversations.axolotl" or
+				el.attr.xmlns == "urn:xmpp:omemo:0" or
+				el.attr.xmlns == "jabber:x:encrypted" or
+				el.attr.xmlns == "urn:xmpp:openpgp:0" or
+				el.attr.xmlns == "urn:xmpp:sce:1" or
+				el.attr.xmlns == "urn:xmpp:jingle-message:0" or
+				el.attr.xmlns == "jabber:x:conference"
+			then
+				return el
+			else
+				return nil
+			end
+		end)
+		envelope_bytes = tostring(envelope)
+	end
+	if string.len(envelope_bytes) > max_data_size then
+		-- If still too big, get aggressive
+		stanza_clone:maptags(function(el)
+			if el.name == "body" or
+				(el.attr.xmlns == "urn:xmpp:sid:0" and el.name == "stanza-id") or
+				el.attr.xmlns == "urn:xmpp:jingle-message:0" or
+				el.attr.xmlns == "jabber:x:conference"
+			then
+				return el
+			else
+				return nil
+			end
+		end)
+		envelope_bytes = tostring(envelope)
+	end
+	if string.len(envelope_bytes) < max_data_size/2 then
+		envelope:text_tag("rpad", base64.encode(random.bytes(math.min(150, max_data_size/3 - string.len(envelope_bytes)))))
+		envelope_bytes = tostring(envelope)
+	end
+	local p256dh_raw = base64.decode(match.ua_public .. "==")
+	local p256dh = pkey.new(p256dh_raw, "*", "public", "prime256v1")
+	local one_time_key = pkey.new({ type = "EC", curve = "prime256v1" })
+	local one_time_key_public = one_time_key:getParameters().pub_key:toBinary()
+	local info = "WebPush: info\0" .. p256dh_raw .. one_time_key_public
+	local auth_secret = base64.decode(match.auth_secret .. "==")
+	local salt = random.bytes(16)
+	local shared_secret = one_time_key:derive(p256dh)
+	local ikm = kdf.derive({
+		type = "HKDF",
+		outlen = 32,
+		salt = auth_secret,
+		key = shared_secret,
+		info = info,
+		md = "sha256"
+	})
+	local key = kdf.derive({
+		type = "HKDF",
+		outlen = 16,
+		salt = salt,
+		key = ikm,
+		info = "Content-Encoding: aes128gcm\0",
+		md = "sha256"
+	})
+	local nonce = kdf.derive({
+		type = "HKDF",
+		outlen = 12,
+		salt = salt,
+		key = ikm,
+		info = "Content-Encoding: nonce\0",
+		md = "sha256"
+	})
+	local header = salt .. "\0\0\16\0" .. string.char(string.len(one_time_key_public)) .. one_time_key_public
+	local encryptor = ciphers.new("AES-128-GCM"):encrypt(key, nonce)
+	push_notification_payload
+		:tag("encrypted", { xmlns = "urn:xmpp:sce:rfc8291:0" })
+		:text_tag("payload", base64.encode(header .. encryptor:final(envelope_bytes .. "\2") .. encryptor:getTag(16)))
+		:up()
+local function add_rfc8292(match, stanza, push_notification_payload)
+	if not match.jwt_alg then return; end
+	local key = match.jwt_key
+	if match.jwt_alg ~= "HS256" then
+		-- keypairs are in PKCS#8 PEM format without header/footer
+		key = "-----BEGIN PRIVATE KEY-----\n"..key.."\n-----END PRIVATE KEY-----"
+	end
+	local signer = jwt.new_signer(match.jwt_alg, key)
+	local payload = {}
+	for k, v in pairs(match.jwt_claims or {}) do
+		payload[k] = v
+	end
+	payload.sub = contact_uri
+	push_notification_payload:text_tag("jwt", signer(payload))
+local function handle_notify_request(stanza, node, user_push_services, log_push_decline)
+	local pushes = 0;
+	if not #user_push_services then return pushes end
+	local notify_push_services = {};
+	if is_important(stanza) then
+		notify_push_services = user_push_services
+	else
+		for identifier, push_info in pairs(user_push_services) do
+			for _, match in ipairs(push_info.matches) do
+				if match.match == "urn:xmpp:push2:match:important" then
+					identifier_found.log("debug", "Not pushing because not important")
+				else
+					notify_push_services[identifier] = push_info;
+				end
+			end
+		end
+	end
+	for push_registration_id, push_info in pairs(notify_push_services) do
+		local send_push = true;		-- only send push to this node when not already done for this stanza or if no stanza is given at all
+		if stanza then
+			if not stanza._push_notify2 then stanza._push_notify2 = {}; end
+			if stanza._push_notify2[push_registration_id] then
+				if log_push_decline then
+					module:log("debug", "Already sent push notification for %s@%s to %s (%s)", node, module.host, push_info.jid, tostring(push_info.node));
+				end
+				send_push = false;
+			end
+			stanza._push_notify2[push_registration_id] = true;
+		end
+		if send_push then
+			local push_notification_payload = st.stanza("notification", { xmlns = xmlns_push })
+			push_notification_payload:text_tag("client", push_info.client)
+			push_notification_payload:text_tag("priority", is_voip(stanza) and "high" or (is_important(stanza) and "normal" or "low"))
+			if is_voip(stanza) then
+				push_notification_payload:tag("voip"):up()
+			end
+			local sends_added = {};
+			for _, match in ipairs(push_info.matches) do
+				local does_match = false;
+				if match.match == "urn:xmpp:push2:match:all" then
+					does_match = true
+				elseif match.match == "urn:xmpp:push2:match:important" then
+					does_match = is_important(stanza)
+				elseif match.match == "urn:xmpp:push2:match:archived" then
+					does_match = stanza:get_child("stana-id", "urn:xmpp:sid:0")
+				elseif match.match == "urn:xmpp:push2:match:archived-with-body" then
+					does_match = stanza:get_child("stana-id", "urn:xmpp:sid:0") and has_body(stanza)
+				end
+				if does_match and not sends_added[match.send] then
+					sends_added[match.send] = true
+					if match.send == "urn:xmpp:push2:send:notify-only" then
+						-- Nothing more to add
+					elseif match.send == "urn:xmpp:push2:send:sce+rfc8291+rfc8292:0" then
+						add_sce_rfc8291(match, stanza, push_notification_payload)
+						add_rfc8292(match, stanza, push_notification_payload)
+					else
+						module:log("debug", "Unkonwn send profile: " .. push_info.send)
+					end
+				end
+			end
+			local push_publish = st.message({ to = push_info.service, from = module.host, id = uuid.generate() })
+				:add_child(push_notification_payload):up()
+			-- TODO: watch for message error replies and count or something
+			module:send(push_publish)
+			pushes = pushes + 1
+		end
+	end
+	return pushes
+-- small helper function to extract relevant push settings
+local function get_push_settings(stanza, session)
+	local to = stanza.attr.to
+	local node = to and jid.split(to) or session.username
+	local user_push_services = push2_registrations:get(node)
+	return node, (user_push_services or {})
+-- publish on bare groupchat
+-- this picks up MUC messages when there are no devices connected
+module:hook("message/bare/groupchat", function(event)
+	local node, user_push_services = get_push_settings(event.stanza, event.origin);
+	local notify_push_services = {};
+	for identifier, push_info in pairs(user_push_services) do
+		for _, match in ipairs(push_info.matches) do
+			if match.match == "urn:xmpp:push2:match:archived-with-body" or match.match == "urn:xmpp:push2:match:archived" then
+				identifier_found.log("debug", "Not pushing because we are not archiving this stanza")
+			else
+				notify_push_services[identifier] = push_info;
+			end
+		end
+	end
+	handle_notify_request(event.stanza, node, notify_push_services, true);
+end, 1);
+local function process_stanza_queue(queue, session, queue_type)
+	if not session.push_registration_id then return; end
+	for _, match in ipairs(session.push_settings.matches) do
+		if match.match == "urn:xmpp:push2:match:archived-with-body" or match.match == "urn:xmpp:push2:match:archived" then
+			module:log("debug", "Not pushing because we are not archiving this stanza: %s", session.push_registration_id)
+			return
+		end
+	end
+	local user_push_services = {[session.push_registration_id] = session.push_settings};
+	local notified = { unimportant = false; important = false }
+	for i=1, #queue do
+		local stanza = queue[i];
+		-- fast ignore of already pushed stanzas
+		if stanza and not (stanza._push_notify2 and stanza._push_notify2[session.push_registration_id]) then
+			local node = get_push_settings(stanza, session);
+			local stanza_type = "unimportant";
+			if is_important(stanza) then stanza_type = "important"; end
+			if not notified[stanza_type] then		-- only notify if we didn't try to push for this stanza type already
+				if handle_notify_request(stanza, node, user_push_services, false) ~= 0 then
+					if session.hibernating and not session.first_hibernated_push then
+						-- if the message was important
+						-- then record the time of first push in the session for the smack module which will extend its hibernation
+						-- timeout based on the value of session.first_hibernated_push
+						if is_important(stanza) then
+							session.first_hibernated_push = os_time();
+							-- check for prosody 0.12 mod_smacks
+							if session.hibernating_watchdog and session.original_smacks_callback and session.original_smacks_timeout then
+								-- restore old smacks watchdog (--> the start of our original timeout will be delayed until first push)
+								session.hibernating_watchdog:cancel();
+								session.hibernating_watchdog = watchdog.new(session.original_smacks_timeout, session.original_smacks_callback);
+							end
+						end
+					end
+					notified[stanza_type] = true
+				end
+			end
+		end
+		if notified.unimportant and notified.important then break; end		-- stop processing the queue if all push types are exhausted
+	end
+-- publish on unacked smacks message (use timer to send out push for all stanzas submitted in a row only once)
+local function process_stanza(session, stanza)
+	if session.push_registration_id then
+		session.log("debug", "adding new stanza to push_queue");
+		if not session.push_queue then session.push_queue = {}; end
+		local queue = session.push_queue;
+		queue[#queue+1] = st.clone(stanza);
+		if not session.awaiting_push_timer then		-- timer not already running --> start new timer
+			session.awaiting_push_timer = module:add_timer(1.0, function ()
+				process_stanza_queue(session.push_queue, session, "push");
+				session.push_queue = {};		-- clean up queue after push
+				session.awaiting_push_timer = nil;
+			end);
+		end
+	end
+	return stanza;
+local function process_smacks_stanza(event)
+	local session = event.origin;
+	local stanza = event.stanza;
+	if not session.push_registration_id then
+		session.log("debug", "NOT invoking handle_notify_request() for newly smacks queued stanza (session.push_registration_id is not set: %s)",
+			session.push_registration_id
+		);
+	else
+		process_stanza(session, stanza)
+	end
+-- smacks hibernation is started
+local function hibernate_session(event)
+	local session = event.origin;
+	local queue = event.queue;
+	session.first_hibernated_push = nil;
+	if session.push_registration_id and session.hibernating_watchdog then -- check for prosody 0.12 mod_smacks
+		-- save old watchdog callback and timeout
+		session.original_smacks_callback = session.hibernating_watchdog.callback;
+		session.original_smacks_timeout = session.hibernating_watchdog.timeout;
+		-- cancel old watchdog and create a new watchdog with extended timeout
+		session.hibernating_watchdog:cancel();
+		session.hibernating_watchdog = watchdog.new(extended_hibernation_timeout, function()
+			session.log("debug", "Push-extended smacks watchdog triggered");
+			if session.original_smacks_callback then
+				session.log("debug", "Calling original smacks watchdog handler");
+				session.original_smacks_callback();
+			end
+		end);
+	end
+	-- process unacked stanzas
+	process_stanza_queue(queue, session, "smacks");
+-- smacks hibernation is ended
+local function restore_session(event)
+	local session = event.resumed;
+	if session then		-- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one
+		if session.awaiting_push_timer then
+			session.awaiting_push_timer:stop();
+			session.awaiting_push_timer = nil;
+		end
+		session.first_hibernated_push = nil;
+		-- the extended smacks watchdog will be canceled by the smacks module, no need to anything here
+	end
+-- smacks ack is delayed
+local function ack_delayed(event)
+	local session = event.origin;
+	local queue = event.queue;
+	local stanza = event.stanza;
+	if not session.push_registration_id then return; end
+	if stanza then process_stanza(session, stanza); return; end		-- don't iterate through smacks queue if we know which stanza triggered this
+	for i=1, #queue do
+		local queued_stanza = queue[i];
+		-- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas)
+		process_stanza(session, queued_stanza);
+	end
+-- archive message added
+local function archive_message_added(event)
+	-- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id }
+	if not event.for_user then return; end
+	-- Note that the stanza in the event is a clone not the same as other hooks, so dedupe doesn't work
+	-- This is a problem if you wan to to also hook offline message storage for example
+	local stanza = st.clone(event.stanza)
+	stanza:tag("stanza-id", { xmlns = "urn:xmpp:sid:0", by = event.for_user.."@"..module.host, id = event.id }):up()
+	local user_session = host_sessions[event.for_user] and host_sessions[event.for_user].sessions or {}
+	local to = stanza.attr.to
+	to = to and jid.split(to) or event.origin.username
+	-- only notify if the stanza destination is the mam user we store it for
+	if event.for_user == to then
+		local user_push_services = push2_registrations:get(to)
+		-- Urgent stanzas are time-sensitive (e.g. calls) and should
+		-- be pushed immediately to avoid getting stuck in the smacks
+		-- queue in case of dead connections, for example
+		local is_voip_stanza, urgent_reason = is_voip(stanza);
+		local notify_push_services;
+		if is_voip_stanza then
+			module:log("debug", "Urgent push for %s (%s)", to, urgent_reason);
+			notify_push_services = user_push_services;
+		else
+			-- only notify nodes with no active sessions (smacks is counted as active and handled separate)
+			notify_push_services = {};
+			for identifier, push_info in pairs(user_push_services) do
+				local identifier_found = nil;
+				for _, session in pairs(user_session) do
+					if session.push_registration_id == identifier then
+						identifier_found = session;
+						break;
+					end
+				end
+				if identifier_found then
+					identifier_found.log("debug", "Not pushing '%s' of new MAM stanza (session still alive)", identifier)
+				elseif not has_body(stanza) then
+					for _, match in ipairs(push_info.matches) do
+						if match.match == "urn:xmpp:push2:match:archived-with-body" then
+							identifier_found.log("debug", "Not pushing '%s' of new MAM stanza (no body)", identifier)
+						else
+							notify_push_services[identifier] = push_info
+						end
+					end
+				else
+					notify_push_services[identifier] = push_info
+				end
+			end
+		end
+		handle_notify_request(stanza, to, notify_push_services, true);
+	end
+module:hook("smacks-hibernation-start", hibernate_session);
+module:hook("smacks-hibernation-end", restore_session);
+module:hook("smacks-ack-delayed", ack_delayed);
+module:hook("smacks-hibernation-stanza-queued", process_smacks_stanza);
+module:hook("archive-message-added", archive_message_added);
+module:log("info", "Module loaded");
+function module.unload()
+	module:log("info", "Unloading module");
+	-- cleanup some settings, reloading this module can cause process_smacks_stanza() to stop working otherwise
+	for user, _ in pairs(host_sessions) do
+		for _, session in pairs(host_sessions[user].sessions) do
+			if session.awaiting_push_timer then session.awaiting_push_timer:stop(); end
+			session.awaiting_push_timer = nil;
+			session.push_queue = nil;
+			session.first_hibernated_push = nil;
+			-- check for prosody 0.12 mod_smacks
+			if session.hibernating_watchdog and session.original_smacks_callback and session.original_smacks_timeout then
+				-- restore old smacks watchdog
+				session.hibernating_watchdog:cancel();
+				session.hibernating_watchdog = watchdog.new(session.original_smacks_timeout, session.original_smacks_callback);
+			end
+		end
+	end
+	module:log("info", "Module unloaded");