Software / code / prosody-modules
Comparison
mod_cloud_notify/mod_cloud_notify.lua @ 4324:45dcf5d4cd6c
mod_cloud_notify: fix push flooding on delayed acks
Under some circumstances the delayed-ack handling caused a push flood,
this commit prevents this and caps pushes at one push per second only.
| author | tmolitor <thilo@eightysoft.de> |
|---|---|
| date | Mon, 11 Jan 2021 22:48:17 +0100 |
| parent | 4295:d44a8d3dd571 |
| child | 4325:9b95241c6ae5 |
comparison
equal
deleted
inserted
replaced
| 4323:a7a06c8cea37 | 4324:45dcf5d4cd6c |
|---|---|
| 408 if notified.unimportant and notified.important then break; end -- stop processing the queue if all push types are exhausted | 408 if notified.unimportant and notified.important then break; end -- stop processing the queue if all push types are exhausted |
| 409 end | 409 end |
| 410 end | 410 end |
| 411 | 411 |
| 412 -- publish on unacked smacks message (use timer to send out push for all stanzas submitted in a row only once) | 412 -- publish on unacked smacks message (use timer to send out push for all stanzas submitted in a row only once) |
| 413 local function process_smacks_stanza(event) | 413 local function process_stanza(session, stanza) |
| 414 local session = event.origin; | |
| 415 local stanza = event.stanza; | |
| 416 if session.push_identifier then | 414 if session.push_identifier then |
| 417 session.log("debug", "adding new stanza to push_queue"); | 415 session.log("debug", "adding new stanza to push_queue"); |
| 418 if not session.push_queue then session.push_queue = {}; end | 416 if not session.push_queue then session.push_queue = {}; end |
| 419 local queue = session.push_queue; | 417 local queue = session.push_queue; |
| 420 queue[#queue+1] = st.clone(stanza); | 418 queue[#queue+1] = st.clone(stanza); |
| 421 if #queue == 1 then -- first stanza --> start timer | 419 if #queue == 1 then -- first stanza --> start timer |
| 422 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanza (in a moment)"); | 420 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanza (in a moment)"); |
| 423 session.awaiting_push_timer = module:add_timer(1e-06, function () | 421 session.awaiting_push_timer = module:add_timer(1.0, function () |
| 424 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanzas (now in timer)"); | 422 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanzas (now in timer)"); |
| 425 process_stanza_queue(session.push_queue, session, "push"); | 423 process_stanza_queue(session.push_queue, session, "push"); |
| 426 session.push_queue = {}; -- clean up queue after push | 424 session.push_queue = {}; -- clean up queue after push |
| 425 session.awaiting_push_timer = nil; | |
| 427 end); | 426 end); |
| 428 end | 427 end |
| 429 else | 428 end |
| 429 return stanza; | |
| 430 end | |
| 431 | |
| 432 local function process_smacks_stanza(event) | |
| 433 local session = event.origin; | |
| 434 local stanza = event.stanza; | |
| 435 if not session.push_identifier then | |
| 430 session.log("debug", "NOT invoking cloud handle_notify_request() for newly smacks queued stanza (session.push_identifier is not set: %s)", | 436 session.log("debug", "NOT invoking cloud handle_notify_request() for newly smacks queued stanza (session.push_identifier is not set: %s)", |
| 431 session.push_identifier | 437 session.push_identifier |
| 432 ); | 438 ); |
| 433 end | 439 else |
| 434 return stanza; | 440 process_stanza(session, stanza) |
| 441 end | |
| 435 end | 442 end |
| 436 | 443 |
| 437 -- smacks hibernation is started | 444 -- smacks hibernation is started |
| 438 local function hibernate_session(event) | 445 local function hibernate_session(event) |
| 439 local session = event.origin; | 446 local session = event.origin; |
| 454 | 461 |
| 455 -- smacks ack is delayed | 462 -- smacks ack is delayed |
| 456 local function ack_delayed(event) | 463 local function ack_delayed(event) |
| 457 local session = event.origin; | 464 local session = event.origin; |
| 458 local queue = event.queue; | 465 local queue = event.queue; |
| 459 -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas) | 466 if not session.push_identifier then return; end |
| 460 process_stanza_queue(queue, session, "smacks"); | 467 for i=1, #queue do |
| 468 local stanza = queue[i]; | |
| 469 -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas) | |
| 470 process_stanza(session, stanza); | |
| 471 end | |
| 461 end | 472 end |
| 462 | 473 |
| 463 -- archive message added | 474 -- archive message added |
| 464 local function archive_message_added(event) | 475 local function archive_message_added(event) |
| 465 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id } | 476 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id } |