Comparison

mod_cloud_notify/mod_cloud_notify.lua @ 3619:74aa35aeb08a

mod_cloud_notify: only push once on csi queue flush in hibernated state, unhook response handlers
author tmolitor <thilo@eightysoft.de>
date Sat, 15 Jun 2019 01:26:15 +0200
parent 3108:cfcb020bcd1d
child 3622:21f870e1ba55
comparison
equal deleted inserted replaced
3618:f781a90018f4 3619:74aa35aeb08a
1 -- XEP-0357: Push (aka: My mobile OS vendor won't let me have persistent TCP connections) 1 -- XEP-0357: Push (aka: My mobile OS vendor won't let me have persistent TCP connections)
2 -- Copyright (C) 2015-2016 Kim Alvefur 2 -- Copyright (C) 2015-2016 Kim Alvefur
3 -- Copyright (C) 2017-2018 Thilo Molitor 3 -- Copyright (C) 2017-2019 Thilo Molitor
4 -- 4 --
5 -- This file is MIT/X11 licensed. 5 -- This file is MIT/X11 licensed.
6 6
7 local t_insert = table.insert; 7 local t_insert = table.insert;
8 local s_match = string.match; 8 local s_match = string.match;
89 count = count + 1; 89 count = count + 1;
90 if count > maximum then break end 90 if count > maximum then break end
91 result[key] = value; 91 result[key] = value;
92 end 92 end
93 return result; 93 return result;
94 end
95
96 local function stoppable_timer(delay, callback)
97 local stopped = false;
98 local timer = module:add_timer(delay, function (t)
99 if stopped then return; end
100 return callback(t);
101 end);
102 if timer.stop then return timer; end -- new prosody api includes stop() function
103 return {
104 stop = function () stopped = true end;
105 timer;
106 };
94 end 107 end
95 108
96 -- For keeping state across reloads while caching reads 109 -- For keeping state across reloads while caching reads
97 local push_store = (function() 110 local push_store = (function()
98 local store = module:open_store(); 111 local store = module:open_store();
193 206
194 for push_identifier, _ in pairs(user_push_services) do 207 for push_identifier, _ in pairs(user_push_services) do
195 if hashes.sha256(push_identifier, true) == stanza.attr.id then 208 if hashes.sha256(push_identifier, true) == stanza.attr.id then
196 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] > 0 then 209 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] > 0 then
197 push_errors[push_identifier] = 0; 210 push_errors[push_identifier] = 0;
211 -- unhook iq handlers for this identifier (if possible)
212 if module.unhook then
213 module:unhook("iq-error/host/"..stanza.attr.id, handle_push_error);
214 module:unhook("iq-result/host/"..stanza.attr.id, handle_push_success);
215 id2node[stanza.attr.id] = nil;
216 end
198 module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s again", push_identifier, tostring(push_errors[push_identifier])); 217 module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s again", push_identifier, tostring(push_errors[push_identifier]));
199 end 218 end
200 end 219 end
201 end 220 end
202 return true; 221 return true;
270 origin.first_hibernated_push = nil; 289 origin.first_hibernated_push = nil;
271 end 290 end
272 user_push_services[key] = nil; 291 user_push_services[key] = nil;
273 push_errors[key] = nil; 292 push_errors[key] = nil;
274 if module.unhook then 293 if module.unhook then
275 module:unhook("iq-error/host/"..key, handle_push_error); 294 local stanza_id = hashes.sha256(key, true)
276 module:unhook("iq-result/host/"..key, handle_push_success); 295 module:unhook("iq-error/host/"..stanza_id, handle_push_error);
277 id2node[key] = nil; 296 module:unhook("iq-result/host/"..stanza_id, handle_push_success);
297 id2node[stanza_id] = nil;
278 end 298 end
279 end 299 end
280 end 300 end
281 local ok = push_store:set(origin.username, user_push_services); 301 local ok = push_store:set(origin.username, user_push_services);
282 if not ok then 302 if not ok then
442 local node, user_push_services = get_push_settings(event.stanza, event.origin); 462 local node, user_push_services = get_push_settings(event.stanza, event.origin);
443 module:log("debug", "Invoking cloud handle_notify_request() for offline stanza"); 463 module:log("debug", "Invoking cloud handle_notify_request() for offline stanza");
444 handle_notify_request(event.stanza, node, user_push_services, true); 464 handle_notify_request(event.stanza, node, user_push_services, true);
445 end, 1); 465 end, 1);
446 466
447 -- publish on unacked smacks message 467 local function process_stanza_queue(queue, session, queue_type)
448 local function process_smacks_stanza(stanza, session)
449 if session.push_identifier then
450 session.log("debug", "Invoking cloud handle_notify_request() for smacks queued stanza");
451 local user_push_services = {[session.push_identifier] = session.push_settings};
452 local node = get_push_settings(stanza, session);
453 if handle_notify_request(stanza, node, user_push_services, true) ~= 0 then
454 if session.hibernating and not session.first_hibernated_push then
455 -- if important stanzas are treated differently (pushed with last-message-body field set to dummy string)
456 -- and the message was important (e.g. had a last-message-body field) OR if we treat all pushes equally,
457 -- then record the time of first push in the session for the smack module which will extend its hibernation
458 -- timeout based on the value of session.first_hibernated_push
459 if not dummy_body or (dummy_body and is_important(stanza)) then
460 session.first_hibernated_push = os_time();
461 end
462 end
463 end
464 end
465 return stanza;
466 end
467
468 local function process_smacks_queue(queue, session)
469 if not session.push_identifier then return; end 468 if not session.push_identifier then return; end
470 local user_push_services = {[session.push_identifier] = session.push_settings}; 469 local user_push_services = {[session.push_identifier] = session.push_settings};
471 local notified = { unimportant = false; important = false } 470 local notified = { unimportant = false; important = false }
472 for i=1, #queue do 471 for i=1, #queue do
473 local stanza = queue[i]; 472 local stanza = queue[i];
484 -- timeout based on the value of session.first_hibernated_push 483 -- timeout based on the value of session.first_hibernated_push
485 if not dummy_body or (dummy_body and is_important(stanza)) then 484 if not dummy_body or (dummy_body and is_important(stanza)) then
486 session.first_hibernated_push = os_time(); 485 session.first_hibernated_push = os_time();
487 end 486 end
488 end 487 end
489 session.log("debug", "Cloud handle_notify_request() > 0, not notifying for other queued stanzas of type %s", stanza_type); 488 session.log("debug", "Cloud handle_notify_request() > 0, not notifying for other %s queued stanzas of type %s", queue_type, stanza_type);
490 notified[stanza_type] = true 489 notified[stanza_type] = true
491 end 490 end
492 end 491 end
493 end 492 end
493 end
494
495 -- publish on unacked smacks message (use timer to send out push for all stanzas submitted in a row only once)
496 local function process_smacks_stanza(stanza, session)
497 if session.push_identifier then
498 if not session.push_queue then session.push_queue = {}; end
499 local queue = session.push_queue;
500 queue[#queue+1] = st.clone(stanza);
501 if #queue == 1 then -- first stanza --> start timer
502 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanza (in a moment)");
503 session.awaiting_push_timer = stoppable_timer(1e-06, function ()
504 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanzas (now in timer)");
505 process_stanza_queue(session.push_queue, session, "push");
506 session.push_queue = {}; -- clean up queue after push
507 end);
508 end
509 end
510 return stanza;
494 end 511 end
495 512
496 -- smacks hibernation is started 513 -- smacks hibernation is started
497 local function hibernate_session(event) 514 local function hibernate_session(event)
498 local session = event.origin; 515 local session = event.origin;
499 local queue = event.queue; 516 local queue = event.queue;
500 session.first_hibernated_push = nil; 517 session.first_hibernated_push = nil;
501 -- process unacked stanzas 518 -- process unacked stanzas
502 process_smacks_queue(queue, session); 519 process_stanza_queue(queue, session, "smacks");
503 -- process future unacked (hibernated) stanzas 520 -- process future unacked (hibernated) stanzas
504 filters.add_filter(session, "stanzas/out", process_smacks_stanza, -990); 521 filters.add_filter(session, "stanzas/out", process_smacks_stanza, -990);
505 end 522 end
506 523
507 -- smacks hibernation is ended 524 -- smacks hibernation is ended
508 local function restore_session(event) 525 local function restore_session(event)
509 local session = event.resumed; 526 local session = event.resumed;
510 if session then -- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one 527 if session then -- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one
511 filters.remove_filter(session, "stanzas/out", process_smacks_stanza); 528 filters.remove_filter(session, "stanzas/out", process_smacks_stanza);
529 if session.awaiting_push_timer then session.awaiting_push_timer:stop(); end
512 session.first_hibernated_push = nil; 530 session.first_hibernated_push = nil;
513 end 531 end
514 end 532 end
515 533
516 -- smacks ack is delayed 534 -- smacks ack is delayed
517 local function ack_delayed(event) 535 local function ack_delayed(event)
518 local session = event.origin; 536 local session = event.origin;
519 local queue = event.queue; 537 local queue = event.queue;
520 -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas) 538 -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas)
521 process_smacks_queue(queue, session); 539 process_stanza_queue(queue, session, "smacks");
522 end 540 end
523 541
524 -- archive message added 542 -- archive message added
525 local function archive_message_added(event) 543 local function archive_message_added(event)
526 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id } 544 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id }
562 module:hook("smacks-hibernation-end", restore_session); 580 module:hook("smacks-hibernation-end", restore_session);
563 module:hook("smacks-ack-delayed", ack_delayed); 581 module:hook("smacks-ack-delayed", ack_delayed);
564 module:hook("archive-message-added", archive_message_added); 582 module:hook("archive-message-added", archive_message_added);
565 583
566 local function send_ping(event) 584 local function send_ping(event)
567 local user = event.user; 585 local push_services = event.push_services;
568 local user_push_services = push_store:get(user); 586 if not push_services then
569 local push_services = event.push_services or user_push_services; 587 local user = event.user;
588 push_services = push_store:get(user);
589 end
570 handle_notify_request(nil, user, push_services, true); 590 handle_notify_request(nil, user, push_services, true);
571 end 591 end
572 -- can be used by other modules to ping one or more (or all) push endpoints 592 -- can be used by other modules to ping one or more (or all) push endpoints
573 module:hook("cloud-notify-ping", send_ping); 593 module:hook("cloud-notify-ping", send_ping);
574 594