Comparison

mod_cloud_notify/mod_cloud_notify.lua @ 3108:cfcb020bcd1d

mod_cloud_notify: inform mod_smacks of first push in hibernated state
author tmolitor <thilo@eightysoft.de>
date Fri, 08 Jun 2018 17:39:07 +0200
parent 3085:1ea6861b533f
child 3619:74aa35aeb08a
comparison
equal deleted inserted replaced
3107:f703cc6e72df 3108:cfcb020bcd1d
125 services[push_identifier] = data; 125 services[push_identifier] = data;
126 return self:set(user, services); 126 return self:set(user, services);
127 end 127 end
128 return api; 128 return api;
129 end)(); 129 end)();
130
130 131
131 -- Forward declarations, as both functions need to reference each other 132 -- Forward declarations, as both functions need to reference each other
132 local handle_push_success, handle_push_error; 133 local handle_push_success, handle_push_error;
133 134
134 function handle_push_error(event) 135 function handle_push_error(event)
154 if host_sessions[node] then 155 if host_sessions[node] then
155 for _, session in pairs(host_sessions[node].sessions) do 156 for _, session in pairs(host_sessions[node].sessions) do
156 if session.push_identifier == push_identifier then 157 if session.push_identifier == push_identifier then
157 session.push_identifier = nil; 158 session.push_identifier = nil;
158 session.push_settings = nil; 159 session.push_settings = nil;
160 session.first_hibernated_push = nil;
159 end 161 end
160 end 162 end
161 end 163 end
162 -- save changed global config 164 -- save changed global config
163 changed = true; 165 changed = true;
239 if not ok then 241 if not ok then
240 origin.send(st.error_reply(stanza, "wait", "internal-server-error")); 242 origin.send(st.error_reply(stanza, "wait", "internal-server-error"));
241 else 243 else
242 origin.push_identifier = push_identifier; 244 origin.push_identifier = push_identifier;
243 origin.push_settings = push_service; 245 origin.push_settings = push_service;
246 origin.first_hibernated_push = nil;
244 origin.log("info", "Push notifications enabled for %s (%s)", tostring(stanza.attr.from), tostring(origin.push_identifier)); 247 origin.log("info", "Push notifications enabled for %s (%s)", tostring(stanza.attr.from), tostring(origin.push_identifier));
245 origin.send(st.reply(stanza)); 248 origin.send(st.reply(stanza));
246 end 249 end
247 return true; 250 return true;
248 end 251 end
262 if push_info.jid == push_jid and (not push_node or push_info.node == push_node) then 265 if push_info.jid == push_jid and (not push_node or push_info.node == push_node) then
263 origin.log("info", "Push notifications disabled (%s)", tostring(key)); 266 origin.log("info", "Push notifications disabled (%s)", tostring(key));
264 if origin.push_identifier == key then 267 if origin.push_identifier == key then
265 origin.push_identifier = nil; 268 origin.push_identifier = nil;
266 origin.push_settings = nil; 269 origin.push_settings = nil;
270 origin.first_hibernated_push = nil;
267 end 271 end
268 user_push_services[key] = nil; 272 user_push_services[key] = nil;
269 push_errors[key] = nil; 273 push_errors[key] = nil;
270 if module.unhook then 274 if module.unhook then
271 module:unhook("iq-error/host/"..key, handle_push_error); 275 module:unhook("iq-error/host/"..key, handle_push_error);
362 { name = "last-message-sender"; type = "jid-single"; }; 366 { name = "last-message-sender"; type = "jid-single"; };
363 { name = "last-message-body"; type = "text-single"; }; 367 { name = "last-message-body"; type = "text-single"; };
364 }; 368 };
365 369
366 -- http://xmpp.org/extensions/xep-0357.html#publishing 370 -- http://xmpp.org/extensions/xep-0357.html#publishing
367 local function handle_notify_request(stanza, node, user_push_services) 371 local function handle_notify_request(stanza, node, user_push_services, log_push_decline)
368 local pushes = 0; 372 local pushes = 0;
369 if not user_push_services or next(user_push_services) == nil then return pushes end 373 if not user_push_services or next(user_push_services) == nil then return pushes end
370 374
371 for push_identifier, push_info in pairs(user_push_services) do 375 for push_identifier, push_info in pairs(user_push_services) do
372 local send_push = true; -- only send push to this node when not already done for this stanza or if no stanza is given at all 376 local send_push = true; -- only send push to this node when not already done for this stanza or if no stanza is given at all
373 if stanza then 377 if stanza then
374 if not stanza._push_notify then stanza._push_notify = {}; end 378 if not stanza._push_notify then stanza._push_notify = {}; end
375 if stanza._push_notify[push_identifier] then 379 if stanza._push_notify[push_identifier] then
376 module:log("debug", "Already sent push notification for %s@%s to %s (%s)", node, module.host, push_info.jid, tostring(push_info.node)); 380 if log_push_decline then
381 module:log("debug", "Already sent push notification for %s@%s to %s (%s)", node, module.host, push_info.jid, tostring(push_info.node));
382 end
377 send_push = false; 383 send_push = false;
378 end 384 end
379 stanza._push_notify[push_identifier] = true; 385 stanza._push_notify[push_identifier] = true;
380 end 386 end
381 387
394 if stanza and include_sender then 400 if stanza and include_sender then
395 form_data["last-message-sender"] = stanza.attr.from; 401 form_data["last-message-sender"] = stanza.attr.from;
396 end 402 end
397 if stanza and include_body then 403 if stanza and include_body then
398 form_data["last-message-body"] = stanza:get_child_text("body"); 404 form_data["last-message-body"] = stanza:get_child_text("body");
399 elseif stanza and dummy_body ~= "" and is_important(stanza) then 405 elseif stanza and dummy_body and is_important(stanza) then
400 form_data["last-message-body"] = dummy_body; 406 form_data["last-message-body"] = tostring(dummy_body);
401 end 407 end
402 push_publish:add_child(push_form:form(form_data)); 408 push_publish:add_child(push_form:form(form_data));
403 push_publish:up(); -- / notification 409 push_publish:up(); -- / notification
404 push_publish:up(); -- / publish 410 push_publish:up(); -- / publish
405 push_publish:up(); -- / pubsub 411 push_publish:up(); -- / pubsub
406 if push_info.options then 412 if push_info.options then
407 push_publish:tag("publish-options"):add_child(st.deserialize(push_info.options)); 413 push_publish:tag("publish-options"):add_child(st.deserialize(push_info.options));
408 end 414 end
409 -- send out push 415 -- send out push
410 module:log("debug", "Sending push notification for %s@%s to %s (%s)", node, module.host, push_info.jid, tostring(push_info.node)); 416 module:log("debug", "Sending%s push notification for %s@%s to %s (%s)", form_data["last-message-body"] and " important" or "", node, module.host, push_info.jid, tostring(push_info.node));
411 -- module:log("debug", "PUSH STANZA: %s", tostring(push_publish)); 417 -- module:log("debug", "PUSH STANZA: %s", tostring(push_publish));
412 -- handle push errors for this node 418 -- handle push errors for this node
413 if push_errors[push_identifier] == nil then 419 if push_errors[push_identifier] == nil then
414 push_errors[push_identifier] = 0; 420 push_errors[push_identifier] = 0;
415 module:hook("iq-error/host/"..stanza_id, handle_push_error); 421 module:hook("iq-error/host/"..stanza_id, handle_push_error);
433 439
434 -- publish on offline message 440 -- publish on offline message
435 module:hook("message/offline/handle", function(event) 441 module:hook("message/offline/handle", function(event)
436 local node, user_push_services = get_push_settings(event.stanza, event.origin); 442 local node, user_push_services = get_push_settings(event.stanza, event.origin);
437 module:log("debug", "Invoking cloud handle_notify_request() for offline stanza"); 443 module:log("debug", "Invoking cloud handle_notify_request() for offline stanza");
438 handle_notify_request(event.stanza, node, user_push_services); 444 handle_notify_request(event.stanza, node, user_push_services, true);
439 end, 1); 445 end, 1);
440 446
441 -- publish on unacked smacks message 447 -- publish on unacked smacks message
442 local function process_smacks_stanza(stanza, session) 448 local function process_smacks_stanza(stanza, session)
443 if session.push_identifier then 449 if session.push_identifier then
444 session.log("debug", "Invoking cloud handle_notify_request() for smacks queued stanza"); 450 session.log("debug", "Invoking cloud handle_notify_request() for smacks queued stanza");
445 local user_push_services = {[session.push_identifier] = session.push_settings}; 451 local user_push_services = {[session.push_identifier] = session.push_settings};
446 local node = get_push_settings(stanza, session); 452 local node = get_push_settings(stanza, session);
447 handle_notify_request(stanza, node, user_push_services); 453 if handle_notify_request(stanza, node, user_push_services, true) ~= 0 then
454 if session.hibernating and not session.first_hibernated_push then
455 -- if important stanzas are treated differently (pushed with last-message-body field set to dummy string)
456 -- and the message was important (e.g. had a last-message-body field) OR if we treat all pushes equally,
457 -- then record the time of first push in the session for the smack module which will extend its hibernation
458 -- timeout based on the value of session.first_hibernated_push
459 if not dummy_body or (dummy_body and is_important(stanza)) then
460 session.first_hibernated_push = os_time();
461 end
462 end
463 end
448 end 464 end
449 return stanza; 465 return stanza;
450 end 466 end
451 467
452 local function process_smacks_queue(queue, session) 468 local function process_smacks_queue(queue, session)
453 if not session.push_identifier then return; end 469 if not session.push_identifier then return; end
454 local user_push_services = {[session.push_identifier] = session.push_settings}; 470 local user_push_services = {[session.push_identifier] = session.push_settings};
471 local notified = { unimportant = false; important = false }
455 for i=1, #queue do 472 for i=1, #queue do
456 local stanza = queue[i]; 473 local stanza = queue[i];
457 local node = get_push_settings(stanza, session); 474 local node = get_push_settings(stanza, session);
458 session.log("debug", "Invoking cloud handle_notify_request() for smacks queued stanza: %d", i); 475 stanza_type = "unimportant"
459 if handle_notify_request(stanza, node, user_push_services) ~= 0 then 476 if dummy_body and is_important(stanza) then stanza_type = "important"; end
460 session.log("debug", "Cloud handle_notify_request() > 0, not notifying for other queued stanzas"); 477 if not notified[stanza_type] then -- only notify if we didn't try to push for this stanza type already
461 return; -- only notify for one stanza in the queue, not for all in a row 478 -- session.log("debug", "Invoking cloud handle_notify_request() for smacks queued stanza: %d", i);
479 if handle_notify_request(stanza, node, user_push_services, false) ~= 0 then
480 if session.hibernating and not session.first_hibernated_push then
481 -- if important stanzas are treated differently (pushed with last-message-body field set to dummy string)
482 -- and the message was important (e.g. had a last-message-body field) OR if we treat all pushes equally,
483 -- then record the time of first push in the session for the smack module which will extend its hibernation
484 -- timeout based on the value of session.first_hibernated_push
485 if not dummy_body or (dummy_body and is_important(stanza)) then
486 session.first_hibernated_push = os_time();
487 end
488 end
489 session.log("debug", "Cloud handle_notify_request() > 0, not notifying for other queued stanzas of type %s", stanza_type);
490 notified[stanza_type] = true
491 end
462 end 492 end
463 end 493 end
464 end 494 end
465 495
466 -- smacks hibernation is started 496 -- smacks hibernation is started
467 local function hibernate_session(event) 497 local function hibernate_session(event)
468 local session = event.origin; 498 local session = event.origin;
469 local queue = event.queue; 499 local queue = event.queue;
500 session.first_hibernated_push = nil;
470 -- process unacked stanzas 501 -- process unacked stanzas
471 process_smacks_queue(queue, session); 502 process_smacks_queue(queue, session);
472 -- process future unacked (hibernated) stanzas 503 -- process future unacked (hibernated) stanzas
473 filters.add_filter(session, "stanzas/out", process_smacks_stanza, -990); 504 filters.add_filter(session, "stanzas/out", process_smacks_stanza, -990);
474 end 505 end
476 -- smacks hibernation is ended 507 -- smacks hibernation is ended
477 local function restore_session(event) 508 local function restore_session(event)
478 local session = event.resumed; 509 local session = event.resumed;
479 if session then -- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one 510 if session then -- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one
480 filters.remove_filter(session, "stanzas/out", process_smacks_stanza); 511 filters.remove_filter(session, "stanzas/out", process_smacks_stanza);
512 session.first_hibernated_push = nil;
481 end 513 end
482 end 514 end
483 515
484 -- smacks ack is delayed 516 -- smacks ack is delayed
485 local function ack_delayed(event) 517 local function ack_delayed(event)
503 if event.for_user == to then 535 if event.for_user == to then
504 local user_push_services = push_store:get(to); 536 local user_push_services = push_store:get(to);
505 if next(user_push_services) == nil then return end 537 if next(user_push_services) == nil then return end
506 538
507 -- only notify nodes with no active sessions (smacks is counted as active and handled separate) 539 -- only notify nodes with no active sessions (smacks is counted as active and handled separate)
508 local notify_push_sevices = {}; 540 local notify_push_services = {};
509 for identifier, push_info in pairs(user_push_services) do 541 for identifier, push_info in pairs(user_push_services) do
510 local identifier_found = nil; 542 local identifier_found = nil;
511 for _, session in pairs(user_session) do 543 for _, session in pairs(user_session) do
512 -- module:log("debug", "searching for '%s': identifier '%s' for session %s", tostring(identifier), tostring(session.push_identifier), tostring(session.full_jid)); 544 -- module:log("debug", "searching for '%s': identifier '%s' for session %s", tostring(identifier), tostring(session.push_identifier), tostring(session.full_jid));
513 if session.push_identifier == identifier then 545 if session.push_identifier == identifier then
516 end 548 end
517 end 549 end
518 if identifier_found then 550 if identifier_found then
519 identifier_found.log("debug", "Not cloud notifying '%s' of new MAM stanza (session still alive)", identifier); 551 identifier_found.log("debug", "Not cloud notifying '%s' of new MAM stanza (session still alive)", identifier);
520 else 552 else
521 notify_push_sevices[identifier] = push_info; 553 notify_push_services[identifier] = push_info;
522 end 554 end
523 end 555 end
524 556
525 handle_notify_request(event.stanza, to, notify_push_sevices); 557 handle_notify_request(event.stanza, to, notify_push_services, true);
526 end 558 end
527 end 559 end
528 560
529 module:hook("smacks-hibernation-start", hibernate_session); 561 module:hook("smacks-hibernation-start", hibernate_session);
530 module:hook("smacks-hibernation-end", restore_session); 562 module:hook("smacks-hibernation-end", restore_session);
533 565
534 local function send_ping(event) 566 local function send_ping(event)
535 local user = event.user; 567 local user = event.user;
536 local user_push_services = push_store:get(user); 568 local user_push_services = push_store:get(user);
537 local push_services = event.push_services or user_push_services; 569 local push_services = event.push_services or user_push_services;
538 handle_notify_request(nil, user, push_services); 570 handle_notify_request(nil, user, push_services, true);
539 end 571 end
540 -- can be used by other modules to ping one or more (or all) push endpoints 572 -- can be used by other modules to ping one or more (or all) push endpoints
541 module:hook("cloud-notify-ping", send_ping); 573 module:hook("cloud-notify-ping", send_ping);
542 574
543 module:log("info", "Module loaded"); 575 module:log("info", "Module loaded");