Software / code / prosody-modules
Comparison
mod_cloud_notify/mod_cloud_notify.lua @ 3619:74aa35aeb08a
mod_cloud_notify: only push once on csi queue flush in hibernated state, unhook response handlers
| author | tmolitor <thilo@eightysoft.de> |
|---|---|
| date | Sat, 15 Jun 2019 01:26:15 +0200 |
| parent | 3108:cfcb020bcd1d |
| child | 3622:21f870e1ba55 |
comparison
equal
deleted
inserted
replaced
| 3618:f781a90018f4 | 3619:74aa35aeb08a |
|---|---|
| 1 -- XEP-0357: Push (aka: My mobile OS vendor won't let me have persistent TCP connections) | 1 -- XEP-0357: Push (aka: My mobile OS vendor won't let me have persistent TCP connections) |
| 2 -- Copyright (C) 2015-2016 Kim Alvefur | 2 -- Copyright (C) 2015-2016 Kim Alvefur |
| 3 -- Copyright (C) 2017-2018 Thilo Molitor | 3 -- Copyright (C) 2017-2019 Thilo Molitor |
| 4 -- | 4 -- |
| 5 -- This file is MIT/X11 licensed. | 5 -- This file is MIT/X11 licensed. |
| 6 | 6 |
| 7 local t_insert = table.insert; | 7 local t_insert = table.insert; |
| 8 local s_match = string.match; | 8 local s_match = string.match; |
| 89 count = count + 1; | 89 count = count + 1; |
| 90 if count > maximum then break end | 90 if count > maximum then break end |
| 91 result[key] = value; | 91 result[key] = value; |
| 92 end | 92 end |
| 93 return result; | 93 return result; |
| 94 end | |
| 95 | |
| 96 local function stoppable_timer(delay, callback) | |
| 97 local stopped = false; | |
| 98 local timer = module:add_timer(delay, function (t) | |
| 99 if stopped then return; end | |
| 100 return callback(t); | |
| 101 end); | |
| 102 if timer.stop then return timer; end -- new prosody api includes stop() function | |
| 103 return { | |
| 104 stop = function () stopped = true end; | |
| 105 timer; | |
| 106 }; | |
| 94 end | 107 end |
| 95 | 108 |
| 96 -- For keeping state across reloads while caching reads | 109 -- For keeping state across reloads while caching reads |
| 97 local push_store = (function() | 110 local push_store = (function() |
| 98 local store = module:open_store(); | 111 local store = module:open_store(); |
| 193 | 206 |
| 194 for push_identifier, _ in pairs(user_push_services) do | 207 for push_identifier, _ in pairs(user_push_services) do |
| 195 if hashes.sha256(push_identifier, true) == stanza.attr.id then | 208 if hashes.sha256(push_identifier, true) == stanza.attr.id then |
| 196 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] > 0 then | 209 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] > 0 then |
| 197 push_errors[push_identifier] = 0; | 210 push_errors[push_identifier] = 0; |
| 211 -- unhook iq handlers for this identifier (if possible) | |
| 212 if module.unhook then | |
| 213 module:unhook("iq-error/host/"..stanza.attr.id, handle_push_error); | |
| 214 module:unhook("iq-result/host/"..stanza.attr.id, handle_push_success); | |
| 215 id2node[stanza.attr.id] = nil; | |
| 216 end | |
| 198 module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s again", push_identifier, tostring(push_errors[push_identifier])); | 217 module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s again", push_identifier, tostring(push_errors[push_identifier])); |
| 199 end | 218 end |
| 200 end | 219 end |
| 201 end | 220 end |
| 202 return true; | 221 return true; |
| 270 origin.first_hibernated_push = nil; | 289 origin.first_hibernated_push = nil; |
| 271 end | 290 end |
| 272 user_push_services[key] = nil; | 291 user_push_services[key] = nil; |
| 273 push_errors[key] = nil; | 292 push_errors[key] = nil; |
| 274 if module.unhook then | 293 if module.unhook then |
| 275 module:unhook("iq-error/host/"..key, handle_push_error); | 294 local stanza_id = hashes.sha256(key, true) |
| 276 module:unhook("iq-result/host/"..key, handle_push_success); | 295 module:unhook("iq-error/host/"..stanza_id, handle_push_error); |
| 277 id2node[key] = nil; | 296 module:unhook("iq-result/host/"..stanza_id, handle_push_success); |
| 297 id2node[stanza_id] = nil; | |
| 278 end | 298 end |
| 279 end | 299 end |
| 280 end | 300 end |
| 281 local ok = push_store:set(origin.username, user_push_services); | 301 local ok = push_store:set(origin.username, user_push_services); |
| 282 if not ok then | 302 if not ok then |
| 442 local node, user_push_services = get_push_settings(event.stanza, event.origin); | 462 local node, user_push_services = get_push_settings(event.stanza, event.origin); |
| 443 module:log("debug", "Invoking cloud handle_notify_request() for offline stanza"); | 463 module:log("debug", "Invoking cloud handle_notify_request() for offline stanza"); |
| 444 handle_notify_request(event.stanza, node, user_push_services, true); | 464 handle_notify_request(event.stanza, node, user_push_services, true); |
| 445 end, 1); | 465 end, 1); |
| 446 | 466 |
| 447 -- publish on unacked smacks message | 467 local function process_stanza_queue(queue, session, queue_type) |
| 448 local function process_smacks_stanza(stanza, session) | |
| 449 if session.push_identifier then | |
| 450 session.log("debug", "Invoking cloud handle_notify_request() for smacks queued stanza"); | |
| 451 local user_push_services = {[session.push_identifier] = session.push_settings}; | |
| 452 local node = get_push_settings(stanza, session); | |
| 453 if handle_notify_request(stanza, node, user_push_services, true) ~= 0 then | |
| 454 if session.hibernating and not session.first_hibernated_push then | |
| 455 -- if important stanzas are treated differently (pushed with last-message-body field set to dummy string) | |
| 456 -- and the message was important (e.g. had a last-message-body field) OR if we treat all pushes equally, | |
| 457 -- then record the time of first push in the session for the smack module which will extend its hibernation | |
| 458 -- timeout based on the value of session.first_hibernated_push | |
| 459 if not dummy_body or (dummy_body and is_important(stanza)) then | |
| 460 session.first_hibernated_push = os_time(); | |
| 461 end | |
| 462 end | |
| 463 end | |
| 464 end | |
| 465 return stanza; | |
| 466 end | |
| 467 | |
| 468 local function process_smacks_queue(queue, session) | |
| 469 if not session.push_identifier then return; end | 468 if not session.push_identifier then return; end |
| 470 local user_push_services = {[session.push_identifier] = session.push_settings}; | 469 local user_push_services = {[session.push_identifier] = session.push_settings}; |
| 471 local notified = { unimportant = false; important = false } | 470 local notified = { unimportant = false; important = false } |
| 472 for i=1, #queue do | 471 for i=1, #queue do |
| 473 local stanza = queue[i]; | 472 local stanza = queue[i]; |
| 484 -- timeout based on the value of session.first_hibernated_push | 483 -- timeout based on the value of session.first_hibernated_push |
| 485 if not dummy_body or (dummy_body and is_important(stanza)) then | 484 if not dummy_body or (dummy_body and is_important(stanza)) then |
| 486 session.first_hibernated_push = os_time(); | 485 session.first_hibernated_push = os_time(); |
| 487 end | 486 end |
| 488 end | 487 end |
| 489 session.log("debug", "Cloud handle_notify_request() > 0, not notifying for other queued stanzas of type %s", stanza_type); | 488 session.log("debug", "Cloud handle_notify_request() > 0, not notifying for other %s queued stanzas of type %s", queue_type, stanza_type); |
| 490 notified[stanza_type] = true | 489 notified[stanza_type] = true |
| 491 end | 490 end |
| 492 end | 491 end |
| 493 end | 492 end |
| 493 end | |
| 494 | |
| 495 -- publish on unacked smacks message (use timer to send out push for all stanzas submitted in a row only once) | |
| 496 local function process_smacks_stanza(stanza, session) | |
| 497 if session.push_identifier then | |
| 498 if not session.push_queue then session.push_queue = {}; end | |
| 499 local queue = session.push_queue; | |
| 500 queue[#queue+1] = st.clone(stanza); | |
| 501 if #queue == 1 then -- first stanza --> start timer | |
| 502 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanza (in a moment)"); | |
| 503 session.awaiting_push_timer = stoppable_timer(1e-06, function () | |
| 504 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanzas (now in timer)"); | |
| 505 process_stanza_queue(session.push_queue, session, "push"); | |
| 506 session.push_queue = {}; -- clean up queue after push | |
| 507 end); | |
| 508 end | |
| 509 end | |
| 510 return stanza; | |
| 494 end | 511 end |
| 495 | 512 |
| 496 -- smacks hibernation is started | 513 -- smacks hibernation is started |
| 497 local function hibernate_session(event) | 514 local function hibernate_session(event) |
| 498 local session = event.origin; | 515 local session = event.origin; |
| 499 local queue = event.queue; | 516 local queue = event.queue; |
| 500 session.first_hibernated_push = nil; | 517 session.first_hibernated_push = nil; |
| 501 -- process unacked stanzas | 518 -- process unacked stanzas |
| 502 process_smacks_queue(queue, session); | 519 process_stanza_queue(queue, session, "smacks"); |
| 503 -- process future unacked (hibernated) stanzas | 520 -- process future unacked (hibernated) stanzas |
| 504 filters.add_filter(session, "stanzas/out", process_smacks_stanza, -990); | 521 filters.add_filter(session, "stanzas/out", process_smacks_stanza, -990); |
| 505 end | 522 end |
| 506 | 523 |
| 507 -- smacks hibernation is ended | 524 -- smacks hibernation is ended |
| 508 local function restore_session(event) | 525 local function restore_session(event) |
| 509 local session = event.resumed; | 526 local session = event.resumed; |
| 510 if session then -- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one | 527 if session then -- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one |
| 511 filters.remove_filter(session, "stanzas/out", process_smacks_stanza); | 528 filters.remove_filter(session, "stanzas/out", process_smacks_stanza); |
| 529 if session.awaiting_push_timer then session.awaiting_push_timer:stop(); end | |
| 512 session.first_hibernated_push = nil; | 530 session.first_hibernated_push = nil; |
| 513 end | 531 end |
| 514 end | 532 end |
| 515 | 533 |
| 516 -- smacks ack is delayed | 534 -- smacks ack is delayed |
| 517 local function ack_delayed(event) | 535 local function ack_delayed(event) |
| 518 local session = event.origin; | 536 local session = event.origin; |
| 519 local queue = event.queue; | 537 local queue = event.queue; |
| 520 -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas) | 538 -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas) |
| 521 process_smacks_queue(queue, session); | 539 process_stanza_queue(queue, session, "smacks"); |
| 522 end | 540 end |
| 523 | 541 |
| 524 -- archive message added | 542 -- archive message added |
| 525 local function archive_message_added(event) | 543 local function archive_message_added(event) |
| 526 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id } | 544 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id } |
| 562 module:hook("smacks-hibernation-end", restore_session); | 580 module:hook("smacks-hibernation-end", restore_session); |
| 563 module:hook("smacks-ack-delayed", ack_delayed); | 581 module:hook("smacks-ack-delayed", ack_delayed); |
| 564 module:hook("archive-message-added", archive_message_added); | 582 module:hook("archive-message-added", archive_message_added); |
| 565 | 583 |
| 566 local function send_ping(event) | 584 local function send_ping(event) |
| 567 local user = event.user; | 585 local push_services = event.push_services; |
| 568 local user_push_services = push_store:get(user); | 586 if not push_services then |
| 569 local push_services = event.push_services or user_push_services; | 587 local user = event.user; |
| 588 push_services = push_store:get(user); | |
| 589 end | |
| 570 handle_notify_request(nil, user, push_services, true); | 590 handle_notify_request(nil, user, push_services, true); |
| 571 end | 591 end |
| 572 -- can be used by other modules to ping one or more (or all) push endpoints | 592 -- can be used by other modules to ping one or more (or all) push endpoints |
| 573 module:hook("cloud-notify-ping", send_ping); | 593 module:hook("cloud-notify-ping", send_ping); |
| 574 | 594 |