Software /
code /
prosody-modules
Comparison
mod_cloud_notify/mod_cloud_notify.lua @ 3619:74aa35aeb08a
mod_cloud_notify: only push once on csi queue flush in hibernated state, unhook response handlers
author | tmolitor <thilo@eightysoft.de> |
---|---|
date | Sat, 15 Jun 2019 01:26:15 +0200 |
parent | 3108:cfcb020bcd1d |
child | 3622:21f870e1ba55 |
comparison
equal
deleted
inserted
replaced
3618:f781a90018f4 | 3619:74aa35aeb08a |
---|---|
1 -- XEP-0357: Push (aka: My mobile OS vendor won't let me have persistent TCP connections) | 1 -- XEP-0357: Push (aka: My mobile OS vendor won't let me have persistent TCP connections) |
2 -- Copyright (C) 2015-2016 Kim Alvefur | 2 -- Copyright (C) 2015-2016 Kim Alvefur |
3 -- Copyright (C) 2017-2018 Thilo Molitor | 3 -- Copyright (C) 2017-2019 Thilo Molitor |
4 -- | 4 -- |
5 -- This file is MIT/X11 licensed. | 5 -- This file is MIT/X11 licensed. |
6 | 6 |
7 local t_insert = table.insert; | 7 local t_insert = table.insert; |
8 local s_match = string.match; | 8 local s_match = string.match; |
89 count = count + 1; | 89 count = count + 1; |
90 if count > maximum then break end | 90 if count > maximum then break end |
91 result[key] = value; | 91 result[key] = value; |
92 end | 92 end |
93 return result; | 93 return result; |
94 end | |
95 | |
96 local function stoppable_timer(delay, callback) | |
97 local stopped = false; | |
98 local timer = module:add_timer(delay, function (t) | |
99 if stopped then return; end | |
100 return callback(t); | |
101 end); | |
102 if timer.stop then return timer; end -- new prosody api includes stop() function | |
103 return { | |
104 stop = function () stopped = true end; | |
105 timer; | |
106 }; | |
94 end | 107 end |
95 | 108 |
96 -- For keeping state across reloads while caching reads | 109 -- For keeping state across reloads while caching reads |
97 local push_store = (function() | 110 local push_store = (function() |
98 local store = module:open_store(); | 111 local store = module:open_store(); |
193 | 206 |
194 for push_identifier, _ in pairs(user_push_services) do | 207 for push_identifier, _ in pairs(user_push_services) do |
195 if hashes.sha256(push_identifier, true) == stanza.attr.id then | 208 if hashes.sha256(push_identifier, true) == stanza.attr.id then |
196 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] > 0 then | 209 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] > 0 then |
197 push_errors[push_identifier] = 0; | 210 push_errors[push_identifier] = 0; |
211 -- unhook iq handlers for this identifier (if possible) | |
212 if module.unhook then | |
213 module:unhook("iq-error/host/"..stanza.attr.id, handle_push_error); | |
214 module:unhook("iq-result/host/"..stanza.attr.id, handle_push_success); | |
215 id2node[stanza.attr.id] = nil; | |
216 end | |
198 module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s again", push_identifier, tostring(push_errors[push_identifier])); | 217 module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s again", push_identifier, tostring(push_errors[push_identifier])); |
199 end | 218 end |
200 end | 219 end |
201 end | 220 end |
202 return true; | 221 return true; |
270 origin.first_hibernated_push = nil; | 289 origin.first_hibernated_push = nil; |
271 end | 290 end |
272 user_push_services[key] = nil; | 291 user_push_services[key] = nil; |
273 push_errors[key] = nil; | 292 push_errors[key] = nil; |
274 if module.unhook then | 293 if module.unhook then |
275 module:unhook("iq-error/host/"..key, handle_push_error); | 294 local stanza_id = hashes.sha256(key, true) |
276 module:unhook("iq-result/host/"..key, handle_push_success); | 295 module:unhook("iq-error/host/"..stanza_id, handle_push_error); |
277 id2node[key] = nil; | 296 module:unhook("iq-result/host/"..stanza_id, handle_push_success); |
297 id2node[stanza_id] = nil; | |
278 end | 298 end |
279 end | 299 end |
280 end | 300 end |
281 local ok = push_store:set(origin.username, user_push_services); | 301 local ok = push_store:set(origin.username, user_push_services); |
282 if not ok then | 302 if not ok then |
442 local node, user_push_services = get_push_settings(event.stanza, event.origin); | 462 local node, user_push_services = get_push_settings(event.stanza, event.origin); |
443 module:log("debug", "Invoking cloud handle_notify_request() for offline stanza"); | 463 module:log("debug", "Invoking cloud handle_notify_request() for offline stanza"); |
444 handle_notify_request(event.stanza, node, user_push_services, true); | 464 handle_notify_request(event.stanza, node, user_push_services, true); |
445 end, 1); | 465 end, 1); |
446 | 466 |
447 -- publish on unacked smacks message | 467 local function process_stanza_queue(queue, session, queue_type) |
448 local function process_smacks_stanza(stanza, session) | |
449 if session.push_identifier then | |
450 session.log("debug", "Invoking cloud handle_notify_request() for smacks queued stanza"); | |
451 local user_push_services = {[session.push_identifier] = session.push_settings}; | |
452 local node = get_push_settings(stanza, session); | |
453 if handle_notify_request(stanza, node, user_push_services, true) ~= 0 then | |
454 if session.hibernating and not session.first_hibernated_push then | |
455 -- if important stanzas are treated differently (pushed with last-message-body field set to dummy string) | |
456 -- and the message was important (e.g. had a last-message-body field) OR if we treat all pushes equally, | |
457 -- then record the time of first push in the session for the smack module which will extend its hibernation | |
458 -- timeout based on the value of session.first_hibernated_push | |
459 if not dummy_body or (dummy_body and is_important(stanza)) then | |
460 session.first_hibernated_push = os_time(); | |
461 end | |
462 end | |
463 end | |
464 end | |
465 return stanza; | |
466 end | |
467 | |
468 local function process_smacks_queue(queue, session) | |
469 if not session.push_identifier then return; end | 468 if not session.push_identifier then return; end |
470 local user_push_services = {[session.push_identifier] = session.push_settings}; | 469 local user_push_services = {[session.push_identifier] = session.push_settings}; |
471 local notified = { unimportant = false; important = false } | 470 local notified = { unimportant = false; important = false } |
472 for i=1, #queue do | 471 for i=1, #queue do |
473 local stanza = queue[i]; | 472 local stanza = queue[i]; |
484 -- timeout based on the value of session.first_hibernated_push | 483 -- timeout based on the value of session.first_hibernated_push |
485 if not dummy_body or (dummy_body and is_important(stanza)) then | 484 if not dummy_body or (dummy_body and is_important(stanza)) then |
486 session.first_hibernated_push = os_time(); | 485 session.first_hibernated_push = os_time(); |
487 end | 486 end |
488 end | 487 end |
489 session.log("debug", "Cloud handle_notify_request() > 0, not notifying for other queued stanzas of type %s", stanza_type); | 488 session.log("debug", "Cloud handle_notify_request() > 0, not notifying for other %s queued stanzas of type %s", queue_type, stanza_type); |
490 notified[stanza_type] = true | 489 notified[stanza_type] = true |
491 end | 490 end |
492 end | 491 end |
493 end | 492 end |
493 end | |
494 | |
495 -- publish on unacked smacks message (use timer to send out push for all stanzas submitted in a row only once) | |
496 local function process_smacks_stanza(stanza, session) | |
497 if session.push_identifier then | |
498 if not session.push_queue then session.push_queue = {}; end | |
499 local queue = session.push_queue; | |
500 queue[#queue+1] = st.clone(stanza); | |
501 if #queue == 1 then -- first stanza --> start timer | |
502 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanza (in a moment)"); | |
503 session.awaiting_push_timer = stoppable_timer(1e-06, function () | |
504 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanzas (now in timer)"); | |
505 process_stanza_queue(session.push_queue, session, "push"); | |
506 session.push_queue = {}; -- clean up queue after push | |
507 end); | |
508 end | |
509 end | |
510 return stanza; | |
494 end | 511 end |
495 | 512 |
496 -- smacks hibernation is started | 513 -- smacks hibernation is started |
497 local function hibernate_session(event) | 514 local function hibernate_session(event) |
498 local session = event.origin; | 515 local session = event.origin; |
499 local queue = event.queue; | 516 local queue = event.queue; |
500 session.first_hibernated_push = nil; | 517 session.first_hibernated_push = nil; |
501 -- process unacked stanzas | 518 -- process unacked stanzas |
502 process_smacks_queue(queue, session); | 519 process_stanza_queue(queue, session, "smacks"); |
503 -- process future unacked (hibernated) stanzas | 520 -- process future unacked (hibernated) stanzas |
504 filters.add_filter(session, "stanzas/out", process_smacks_stanza, -990); | 521 filters.add_filter(session, "stanzas/out", process_smacks_stanza, -990); |
505 end | 522 end |
506 | 523 |
507 -- smacks hibernation is ended | 524 -- smacks hibernation is ended |
508 local function restore_session(event) | 525 local function restore_session(event) |
509 local session = event.resumed; | 526 local session = event.resumed; |
510 if session then -- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one | 527 if session then -- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one |
511 filters.remove_filter(session, "stanzas/out", process_smacks_stanza); | 528 filters.remove_filter(session, "stanzas/out", process_smacks_stanza); |
529 if session.awaiting_push_timer then session.awaiting_push_timer:stop(); end | |
512 session.first_hibernated_push = nil; | 530 session.first_hibernated_push = nil; |
513 end | 531 end |
514 end | 532 end |
515 | 533 |
516 -- smacks ack is delayed | 534 -- smacks ack is delayed |
517 local function ack_delayed(event) | 535 local function ack_delayed(event) |
518 local session = event.origin; | 536 local session = event.origin; |
519 local queue = event.queue; | 537 local queue = event.queue; |
520 -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas) | 538 -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas) |
521 process_smacks_queue(queue, session); | 539 process_stanza_queue(queue, session, "smacks"); |
522 end | 540 end |
523 | 541 |
524 -- archive message added | 542 -- archive message added |
525 local function archive_message_added(event) | 543 local function archive_message_added(event) |
526 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id } | 544 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id } |
562 module:hook("smacks-hibernation-end", restore_session); | 580 module:hook("smacks-hibernation-end", restore_session); |
563 module:hook("smacks-ack-delayed", ack_delayed); | 581 module:hook("smacks-ack-delayed", ack_delayed); |
564 module:hook("archive-message-added", archive_message_added); | 582 module:hook("archive-message-added", archive_message_added); |
565 | 583 |
566 local function send_ping(event) | 584 local function send_ping(event) |
567 local user = event.user; | 585 local push_services = event.push_services; |
568 local user_push_services = push_store:get(user); | 586 if not push_services then |
569 local push_services = event.push_services or user_push_services; | 587 local user = event.user; |
588 push_services = push_store:get(user); | |
589 end | |
570 handle_notify_request(nil, user, push_services, true); | 590 handle_notify_request(nil, user, push_services, true); |
571 end | 591 end |
572 -- can be used by other modules to ping one or more (or all) push endpoints | 592 -- can be used by other modules to ping one or more (or all) push endpoints |
573 module:hook("cloud-notify-ping", send_ping); | 593 module:hook("cloud-notify-ping", send_ping); |
574 | 594 |