Comparison

plugins/mod_smacks.lua @ 12125:649268c9f603

mod_smacks: sprinkle some metrics on it
author Jonas Schäfer <jonas@wielicki.name>
date Mon, 27 Dec 2021 16:16:53 +0100
parent 12114:e32f90c81519
child 12135:fa804c2db747
comparison
equal deleted inserted replaced
12124:7d985e5bc1fb 12125:649268c9f603
12 -- 12 --
13 13
14 local tonumber = tonumber; 14 local tonumber = tonumber;
15 local tostring = tostring; 15 local tostring = tostring;
16 local os_time = os.time; 16 local os_time = os.time;
17
18 -- These metrics together allow to calculate an instantaneous
19 -- "unacked stanzas" metric in the graphing frontend, without us having to
20 -- iterate over all the queues.
21 local tx_queued_stanzas = module:measure("tx_queued_stanzas", "counter");
22 local tx_dropped_stanzas = module:metric(
23 "histogram",
24 "tx_dropped_stanzas", "", "number of stanzas in a queue which got dropped",
25 {},
26 {buckets = {0, 1, 2, 4, 8, 16, 32}}
27 ):with_labels();
28 local tx_acked_stanzas = module:metric(
29 "histogram",
30 "tx_acked_stanzas", "", "number of items acked per ack received",
31 {},
32 {buckets = {0, 1, 2, 4, 8, 16, 32}}
33 ):with_labels();
34
35 -- number of session resumptions attempts where the session had expired
36 local resumption_expired = module:measure("session_resumption_expired", "counter");
37 local resumption_age = module:metric(
38 "histogram",
39 "resumption_age", "seconds", "time the session had been hibernating at the time of a resumption",
40 {},
41 {buckets = { 0, 1, 2, 5, 10, 20, 50, 100, 200, 500 }}
42 ):with_labels();
43 local sessions_expired = module:measure("sessions_expired", "counter");
44 local sessions_started = module:measure("sessions_started", "counter");
45
17 46
18 local datetime = require "util.datetime"; 47 local datetime = require "util.datetime";
19 local add_filter = require "util.filters".add_filter; 48 local add_filter = require "util.filters".add_filter;
20 local jid = require "util.jid"; 49 local jid = require "util.jid";
21 local smqueue = require "util.smqueue"; 50 local smqueue = require "util.smqueue";
166 stamp = datetime.datetime() 195 stamp = datetime.datetime()
167 }); 196 });
168 end 197 end
169 198
170 queue:push(cached_stanza); 199 queue:push(cached_stanza);
200 tx_queued_stanzas(1);
171 201
172 if session.hibernating then 202 if session.hibernating then
173 session.log("debug", "hibernating since %s, stanza queued", datetime.datetime(session.hibernating)); 203 session.log("debug", "hibernating since %s, stanza queued", datetime.datetime(session.hibernating));
174 -- FIXME queue implementation changed, anything depending on it being an array will break 204 -- FIXME queue implementation changed, anything depending on it being an array will break
175 module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue:table(), stanza = cached_stanza}); 205 module:fire_event("smacks-hibernation-stanza-queued", {origin = session, queue = queue:table(), stanza = cached_stanza});
227 end 257 end
228 end); 258 end);
229 259
230 local function wrap_session_in(session, resume) 260 local function wrap_session_in(session, resume)
231 if not resume then 261 if not resume then
262 sessions_started(1);
232 session.handled_stanza_count = 0; 263 session.handled_stanza_count = 0;
233 end 264 end
234 add_filter(session, "stanzas/in", count_incoming_stanzas, 999); 265 add_filter(session, "stanzas/in", count_incoming_stanzas, 999);
235 266
236 return session; 267 return session;
347 origin.log("debug", "Q item %d: %s", i, item); 378 origin.log("debug", "Q item %d: %s", i, item);
348 end 379 end
349 origin:close(err); 380 origin:close(err);
350 return; 381 return;
351 end 382 end
352 383 tx_acked_stanzas:sample(handled_stanza_count);
353 origin.log("debug", "#queue = %d", queue:count_unacked()); 384
385 origin.log("debug", "#queue = %d (acked: %d)", queue:count_unacked(), handled_stanza_count);
354 request_ack_now_if_needed(origin, false, "handle_a", nil) 386 request_ack_now_if_needed(origin, false, "handle_a", nil)
355 return true; 387 return true;
356 end 388 end
357 module:hook_tag(xmlns_sm2, "a", handle_a); 389 module:hook_tag(xmlns_sm2, "a", handle_a);
358 module:hook_tag(xmlns_sm3, "a", handle_a); 390 module:hook_tag(xmlns_sm3, "a", handle_a);
359 391
360 local function handle_unacked_stanzas(session) 392 local function handle_unacked_stanzas(session)
361 local queue = session.outgoing_stanza_queue; 393 local queue = session.outgoing_stanza_queue;
362 if queue:count_unacked() > 0 then 394 local unacked = queue:count_unacked()
395 if unacked > 0 then
396 tx_dropped_stanzas:sample(unacked);
363 session.smacks = false; -- Disable queueing 397 session.smacks = false; -- Disable queueing
364 session.outgoing_stanza_queue = nil; 398 session.outgoing_stanza_queue = nil;
365 for stanza in queue._queue:consume() do 399 for stanza in queue._queue:consume() do
366 if not module:fire_event("delivery/failure", { session = session, stanza = stanza }) then 400 if not module:fire_event("delivery/failure", { session = session, stanza = stanza }) then
367 if stanza.attr.type ~= "error" and stanza.attr.to ~= session.full_jid then 401 if stanza.attr.type ~= "error" and stanza.attr.to ~= session.full_jid then
435 session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil; 469 session_registry[jid.join(session.username, session.host, session.resumption_token)] = nil;
436 old_session_registry:set(session.username, session.resumption_token, { h = session.handled_stanza_count }); 470 old_session_registry:set(session.username, session.resumption_token, { h = session.handled_stanza_count });
437 session.resumption_token = nil; 471 session.resumption_token = nil;
438 session.resending_unacked = true; -- stop outgoing_stanza_filter from re-queueing anything anymore 472 session.resending_unacked = true; -- stop outgoing_stanza_filter from re-queueing anything anymore
439 sessionmanager.destroy_session(session, "Hibernating too long"); 473 sessionmanager.destroy_session(session, "Hibernating too long");
474 sessions_expired(1);
440 end); 475 end);
441 if session.conn then 476 if session.conn then
442 local conn = session.conn; 477 local conn = session.conn;
443 c2s_sessions[conn] = nil; 478 c2s_sessions[conn] = nil;
444 session.conn = nil; 479 session.conn = nil;
488 session.log("debug", "Tried to resume old expired session with id %s", id); 523 session.log("debug", "Tried to resume old expired session with id %s", id);
489 session.send(st.stanza("failed", { xmlns = xmlns_sm, h = format_h(old_session.h) }) 524 session.send(st.stanza("failed", { xmlns = xmlns_sm, h = format_h(old_session.h) })
490 :tag("item-not-found", { xmlns = xmlns_errors }) 525 :tag("item-not-found", { xmlns = xmlns_errors })
491 ); 526 );
492 old_session_registry:set(session.username, id, nil); 527 old_session_registry:set(session.username, id, nil);
528 resumption_expired(1);
493 else 529 else
494 session.log("debug", "Tried to resume non-existent session with id %s", id); 530 session.log("debug", "Tried to resume non-existent session with id %s", id);
495 session.send(st.stanza("failed", { xmlns = xmlns_sm }) 531 session.send(st.stanza("failed", { xmlns = xmlns_sm })
496 :tag("item-not-found", { xmlns = xmlns_errors }) 532 :tag("item-not-found", { xmlns = xmlns_errors })
497 ); 533 );
501 original_session.log("debug", "Letting the watchdog go"); 537 original_session.log("debug", "Letting the watchdog go");
502 original_session.hibernating_watchdog:cancel(); 538 original_session.hibernating_watchdog:cancel();
503 original_session.hibernating_watchdog = nil; 539 original_session.hibernating_watchdog = nil;
504 elseif session.hibernating then 540 elseif session.hibernating then
505 original_session.log("error", "Hibernating session has no watchdog!") 541 original_session.log("error", "Hibernating session has no watchdog!")
542 end
543 -- zero age = was not hibernating yet
544 local age = 0;
545 if original_session.hibernating then
546 local now = os_time();
547 age = now - original_session.hibernating;
506 end 548 end
507 session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session)); 549 session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session));
508 original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session)); 550 original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session));
509 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) 551 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session))
510 if original_session.conn then 552 if original_session.conn then
579 return false; 621 return false;
580 end 622 end
581 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()}); 623 module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()});
582 original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption 624 original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption
583 request_ack_now_if_needed(original_session, true, "handle_resume", nil); 625 request_ack_now_if_needed(original_session, true, "handle_resume", nil);
626 resumption_age:sample(age);
584 end 627 end
585 return true; 628 return true;
586 end 629 end
587 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); 630 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
588 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); 631 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);