Software /
code /
prosody
Comparison
plugins/mod_smacks.lua @ 12125:649268c9f603
mod_smacks: sprinkle some metrics on it
author | Jonas Schäfer <jonas@wielicki.name> |
---|---|
date | Mon, 27 Dec 2021 16:16:53 +0100 |
parent | 12114:e32f90c81519 |
child | 12135:fa804c2db747 |
comparison
equal
deleted
inserted
replaced
12124:7d985e5bc1fb | 12125:649268c9f603 |
---|---|
12 -- | 12 -- |
13 | 13 |
14 local tonumber = tonumber; | 14 local tonumber = tonumber; |
15 local tostring = tostring; | 15 local tostring = tostring; |
16 local os_time = os.time; | 16 local os_time = os.time; |
17 | |
18 -- These metrics together allow to calculate an instantaneous | |
19 -- "unacked stanzas" metric in the graphing frontend, without us having to | |
20 -- iterate over all the queues. | |
21 local tx_queued_stanzas = module:measure("tx_queued_stanzas", "counter"); | |
22 local tx_dropped_stanzas = module:metric( | |
23 "histogram", | |
24 "tx_dropped_stanzas", "", "number of stanzas in a queue which got dropped", | |
25 {}, | |
26 {buckets = {0, 1, 2, 4, 8, 16, 32}} | |
27 ):with_labels(); | |
28 local tx_acked_stanzas = module:metric( | |
29 "histogram", | |
30 "tx_acked_stanzas", "", "number of items acked per ack received", | |
31 {}, | |
32 {buckets = {0, 1, 2, 4, 8, 16, 32}} | |
33 ):with_labels(); | |
34 | |
35 -- number of session resumptions attempts where the session had expired | |
36 local resumption_expired = module:measure("session_resumption_expired", "counter"); | |
37 local resumption_age = module:metric( | |
38 "histogram", | |
39 "resumption_age", "seconds", "time the session had been hibernating at the time of a resumption", | |
40 {}, | |
41 {buckets = { 0, 1, 2, 5, 10, 20, 50, 100, 200, 500 }} | |
42 ):with_labels(); | |
43 local sessions_expired = module:measure("sessions_expired", "counter"); | |
44 local sessions_started = module:measure("sessions_started", "counter"); | |
45 | |
17 | 46 |
18 local datetime = require "util.datetime"; | 47 local datetime = require "util.datetime"; |
19 local add_filter = require "util.filters".add_filter; | 48 local add_filter = require "util.filters".add_filter; |
20 local jid = require "util.jid"; | 49 local jid = require "util.jid"; |
21 local smqueue = require "util.smqueue"; | 50 local smqueue = require "util.smqueue"; |
166 stamp = datetime.datetime() | 195 stamp = datetime.datetime() |
167 }); | 196 }); |
168 end | 197 end |
169 | 198 |
170 queue:push(cached_stanza); | 199 queue:push(cached_stanza); |
200 tx_queued_stanzas(1); | |
171 | 201 |
172 if session.hibernating then | 202 if session.hibernating then |
173 session.log("debug", "hibernating since %s, stanza queued", datetime.datetime(session.hibernating)); | 203 session.log("debug", "hibernating since %s, stanza queued", datetime.datetime(session.hibernating)); |
174 -- FIXME queue implementation changed, anything depending on it being an array will break | 204 -- FIXME queue implementation changed, anything depending on it being an array will break |
175 module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue:table(), stanza = cached_stanza}); | 205 module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue:table(), stanza = cached_stanza}); |
227 end | 257 end |
228 end); | 258 end); |
229 | 259 |
230 local function wrap_session_in(session, resume) | 260 local function wrap_session_in(session, resume) |
231 if not resume then | 261 if not resume then |
262 sessions_started(1); | |
232 session.handled_stanza_count = 0; | 263 session.handled_stanza_count = 0; |
233 end | 264 end |
234 add_filter(session, "stanzas/in", count_incoming_stanzas, 999); | 265 add_filter(session, "stanzas/in", count_incoming_stanzas, 999); |
235 | 266 |
236 return session; | 267 return session; |
347 origin.log("debug", "Q item %d: %s", i, item); | 378 origin.log("debug", "Q item %d: %s", i, item); |
348 end | 379 end |
349 origin:close(err); | 380 origin:close(err); |
350 return; | 381 return; |
351 end | 382 end |
352 | 383 tx_acked_stanzas:sample(handled_stanza_count); |
353 origin.log("debug", "#queue = %d", queue:count_unacked()); | 384 |
385 origin.log("debug", "#queue = %d (acked: %d)", queue:count_unacked(), handled_stanza_count); | |
354 request_ack_now_if_needed(origin, false, "handle_a", nil) | 386 request_ack_now_if_needed(origin, false, "handle_a", nil) |
355 return true; | 387 return true; |
356 end | 388 end |
357 module:hook_tag(xmlns_sm2, "a", handle_a); | 389 module:hook_tag(xmlns_sm2, "a", handle_a); |
358 module:hook_tag(xmlns_sm3, "a", handle_a); | 390 module:hook_tag(xmlns_sm3, "a", handle_a); |
359 | 391 |
360 local function handle_unacked_stanzas(session) | 392 local function handle_unacked_stanzas(session) |
361 local queue = session.outgoing_stanza_queue; | 393 local queue = session.outgoing_stanza_queue; |
362 if queue:count_unacked() > 0 then | 394 local unacked = queue:count_unacked() |
395 if unacked > 0 then | |
396 tx_dropped_stanzas:sample(unacked); | |
363 session.smacks = false; -- Disable queueing | 397 session.smacks = false; -- Disable queueing |
364 session.outgoing_stanza_queue = nil; | 398 session.outgoing_stanza_queue = nil; |
365 for stanza in queue._queue:consume() do | 399 for stanza in queue._queue:consume() do |
366 if not module:fire_event("delivery/failure", { session = session, stanza = stanza }) then | 400 if not module:fire_event("delivery/failure", { session = session, stanza = stanza }) then |
367 if stanza.attr.type ~= "error" and stanza.attr.to ~= session.full_jid then | 401 if stanza.attr.type ~= "error" and stanza.attr.to ~= session.full_jid then |
435 session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil; | 469 session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil; |
436 old_session_registry:set(session.username, session.resumption_token, { h = session.handled_stanza_count }); | 470 old_session_registry:set(session.username, session.resumption_token, { h = session.handled_stanza_count }); |
437 session.resumption_token = nil; | 471 session.resumption_token = nil; |
438 session.resending_unacked = true; -- stop outgoing_stanza_filter from re-queueing anything anymore | 472 session.resending_unacked = true; -- stop outgoing_stanza_filter from re-queueing anything anymore |
439 sessionmanager.destroy_session(session, "Hibernating too long"); | 473 sessionmanager.destroy_session(session, "Hibernating too long"); |
474 sessions_expired(1); | |
440 end); | 475 end); |
441 if session.conn then | 476 if session.conn then |
442 local conn = session.conn; | 477 local conn = session.conn; |
443 c2s_sessions[conn] = nil; | 478 c2s_sessions[conn] = nil; |
444 session.conn = nil; | 479 session.conn = nil; |
488 session.log("debug", "Tried to resume old expired session with id %s", id); | 523 session.log("debug", "Tried to resume old expired session with id %s", id); |
489 session.send(st.stanza("failed", { xmlns = xmlns_sm, h = format_h(old_session.h) }) | 524 session.send(st.stanza("failed", { xmlns = xmlns_sm, h = format_h(old_session.h) }) |
490 :tag("item-not-found", { xmlns = xmlns_errors }) | 525 :tag("item-not-found", { xmlns = xmlns_errors }) |
491 ); | 526 ); |
492 old_session_registry:set(session.username, id, nil); | 527 old_session_registry:set(session.username, id, nil); |
528 resumption_expired(1); | |
493 else | 529 else |
494 session.log("debug", "Tried to resume non-existent session with id %s", id); | 530 session.log("debug", "Tried to resume non-existent session with id %s", id); |
495 session.send(st.stanza("failed", { xmlns = xmlns_sm }) | 531 session.send(st.stanza("failed", { xmlns = xmlns_sm }) |
496 :tag("item-not-found", { xmlns = xmlns_errors }) | 532 :tag("item-not-found", { xmlns = xmlns_errors }) |
497 ); | 533 ); |
501 original_session.log("debug", "Letting the watchdog go"); | 537 original_session.log("debug", "Letting the watchdog go"); |
502 original_session.hibernating_watchdog:cancel(); | 538 original_session.hibernating_watchdog:cancel(); |
503 original_session.hibernating_watchdog = nil; | 539 original_session.hibernating_watchdog = nil; |
504 elseif session.hibernating then | 540 elseif session.hibernating then |
505 original_session.log("error", "Hibernating session has no watchdog!") | 541 original_session.log("error", "Hibernating session has no watchdog!") |
542 end | |
543 -- zero age = was not hibernating yet | |
544 local age = 0; | |
545 if original_session.hibernating then | |
546 local now = os_time(); | |
547 age = now - original_session.hibernating; | |
506 end | 548 end |
507 session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session)); | 549 session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session)); |
508 original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session)); | 550 original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session)); |
509 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) | 551 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) |
510 if original_session.conn then | 552 if original_session.conn then |
579 return false; | 621 return false; |
580 end | 622 end |
581 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()}); | 623 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()}); |
582 original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption | 624 original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption |
583 request_ack_now_if_needed(original_session, true, "handle_resume", nil); | 625 request_ack_now_if_needed(original_session, true, "handle_resume", nil); |
626 resumption_age:sample(age); | |
584 end | 627 end |
585 return true; | 628 return true; |
586 end | 629 end |
587 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); | 630 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); |
588 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); | 631 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); |