Changeset

3619:74aa35aeb08a

mod_cloud_notify: only push once on csi queue flush in hibernated state, unhook response handlers
author tmolitor <thilo@eightysoft.de>
date Sat, 15 Jun 2019 01:26:15 +0200
parents 3618:f781a90018f4
children 3620:fb1c8dee2ead
files mod_cloud_notify/mod_cloud_notify.lua
diffstat 1 files changed, 52 insertions(+), 32 deletions(-) [+]
line wrap: on
line diff
--- a/mod_cloud_notify/mod_cloud_notify.lua	Thu Jun 13 00:25:12 2019 +0200
+++ b/mod_cloud_notify/mod_cloud_notify.lua	Sat Jun 15 01:26:15 2019 +0200
@@ -1,6 +1,6 @@
 -- XEP-0357: Push (aka: My mobile OS vendor won't let me have persistent TCP connections)
 -- Copyright (C) 2015-2016 Kim Alvefur
--- Copyright (C) 2017-2018 Thilo Molitor
+-- Copyright (C) 2017-2019 Thilo Molitor
 --
 -- This file is MIT/X11 licensed.
 
@@ -93,6 +93,19 @@
 	return result;
 end
 
+local function stoppable_timer(delay, callback)
+	local stopped = false;
+	local timer = module:add_timer(delay, function (t)
+		if stopped then return; end
+		return callback(t);
+	end);
+	if timer.stop then return timer; end		-- new prosody api includes stop() function
+	return {
+		stop = function () stopped = true end;
+		timer;
+	};
+end
+
 -- For keeping state across reloads while caching reads
 local push_store = (function()
 	local store = module:open_store();
@@ -195,6 +208,12 @@
 		if hashes.sha256(push_identifier, true) == stanza.attr.id then
 			if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] > 0 then
 				push_errors[push_identifier] = 0;
+				-- unhook iq handlers for this identifier (if possible)
+				if module.unhook then
+					module:unhook("iq-error/host/"..stanza.attr.id, handle_push_error);
+					module:unhook("iq-result/host/"..stanza.attr.id, handle_push_success);
+					id2node[stanza.attr.id] = nil;
+				end
 				module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s again", push_identifier, tostring(push_errors[push_identifier]));
 			end
 		end
@@ -272,9 +291,10 @@
 			user_push_services[key] = nil;
 			push_errors[key] = nil;
 			if module.unhook then
-				module:unhook("iq-error/host/"..key, handle_push_error);
-				module:unhook("iq-result/host/"..key, handle_push_success);
-				id2node[key] = nil;
+				local stanza_id = hashes.sha256(key, true)
+				module:unhook("iq-error/host/"..stanza_id, handle_push_error);
+				module:unhook("iq-result/host/"..stanza_id, handle_push_success);
+				id2node[stanza_id] = nil;
 			end
 		end
 	end
@@ -444,28 +464,7 @@
 	handle_notify_request(event.stanza, node, user_push_services, true);
 end, 1);
 
--- publish on unacked smacks message
-local function process_smacks_stanza(stanza, session)
-	if session.push_identifier then
-		session.log("debug", "Invoking cloud handle_notify_request() for smacks queued stanza");
-		local user_push_services = {[session.push_identifier] = session.push_settings};
-		local node = get_push_settings(stanza, session);
-		if handle_notify_request(stanza, node, user_push_services, true) ~= 0 then
-			if session.hibernating and not session.first_hibernated_push then
-				-- if important stanzas are treated differently (pushed with last-message-body field set to dummy string)
-				-- and the message was important (e.g. had a last-message-body field) OR if we treat all pushes equally,
-				-- then record the time of first push in the session for the smack module which will extend its hibernation
-				-- timeout based on the value of session.first_hibernated_push
-				if not dummy_body or (dummy_body and is_important(stanza)) then
-					session.first_hibernated_push = os_time();
-				end
-			end
-		end
-	end
-	return stanza;
-end
-
-local function process_smacks_queue(queue, session)
+local function process_stanza_queue(queue, session, queue_type)
 	if not session.push_identifier then return; end
 	local user_push_services = {[session.push_identifier] = session.push_settings};
 	local notified = { unimportant = false; important = false }
@@ -486,20 +485,38 @@
 						session.first_hibernated_push = os_time();
 					end
 				end
-				session.log("debug", "Cloud handle_notify_request() > 0, not notifying for other queued stanzas of type %s", stanza_type);
+				session.log("debug", "Cloud handle_notify_request() > 0, not notifying for other %s queued stanzas of type %s", queue_type, stanza_type);
 				notified[stanza_type] = true
 			end
 		end
 	end
 end
 
+-- publish on unacked smacks message (use timer to send out push for all stanzas submitted in a row only once)
+local function process_smacks_stanza(stanza, session)
+	if session.push_identifier then
+		if not session.push_queue then session.push_queue = {}; end
+		local queue = session.push_queue;
+		queue[#queue+1] = st.clone(stanza);
+		if #queue == 1 then		-- first stanza --> start timer
+			session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanza (in a moment)");
+			session.awaiting_push_timer = stoppable_timer(1e-06, function ()
+				session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanzas (now in timer)");
+				process_stanza_queue(session.push_queue, session, "push");
+				session.push_queue = {};		-- clean up queue after push
+			end);
+		end
+	end
+	return stanza;
+end
+
 -- smacks hibernation is started
 local function hibernate_session(event)
 	local session = event.origin;
 	local queue = event.queue;
 	session.first_hibernated_push = nil;
 	-- process unacked stanzas
-	process_smacks_queue(queue, session);
+	process_stanza_queue(queue, session, "smacks");
 	-- process future unacked (hibernated) stanzas
 	filters.add_filter(session, "stanzas/out", process_smacks_stanza, -990);
 end
@@ -509,6 +526,7 @@
 	local session = event.resumed;
 	if session then		-- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one
 		filters.remove_filter(session, "stanzas/out", process_smacks_stanza);
+		if session.awaiting_push_timer then session.awaiting_push_timer:stop(); end
 		session.first_hibernated_push = nil;
 	end
 end
@@ -518,7 +536,7 @@
 	local session = event.origin;
 	local queue = event.queue;
 	-- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas)
-	process_smacks_queue(queue, session);
+	process_stanza_queue(queue, session, "smacks");
 end
 
 -- archive message added
@@ -564,9 +582,11 @@
 module:hook("archive-message-added", archive_message_added);
 
 local function send_ping(event)
-	local user = event.user;
-	local user_push_services = push_store:get(user);
-	local push_services = event.push_services or user_push_services;
+	local push_services = event.push_services;
+	if not push_services then
+		local user = event.user;
+		push_services = push_store:get(user);
+	end
 	handle_notify_request(nil, user, push_services, true);
 end
 -- can be used by other modules to ping one or more (or all) push endpoints