Diff

mod_statistics_statsman/mod_statistics_statsman.lua @ 3158:2558ece20e58

mod_statistics_statsman: Streaming access to statsmanager
author Kim Alvefur <zash@zash.se>
date Thu, 28 Jun 2018 21:42:48 +0200
child 3169:5d58bdbfe024
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_statistics_statsman/mod_statistics_statsman.lua	Thu Jun 28 21:42:48 2018 +0200
@@ -0,0 +1,190 @@
+module:set_global();
+
+local statsman = require "core.statsmanager";
+local time_now = require "util.time".now;
+local filters = require "util.filters";
+local serialize = require "util.serialization".serialize;
+
+local statistics_interval = module:context("*"):get_option_number("statistics_interval", 60);
+if module:context("*"):get_option("statistics", "internal") ~= "internal" then
+	module:log("error", "Not using internal statistics, can't do anyting");
+	return;
+end
+
+local sessions = {};
+
+local name_map = {
+	["start_time"] = "up_since";
+	["cpu.percent:amount"] = "cpu";
+	["memory.allocated_mmap:size"] = "memory_allocated_mmap";
+	["memory.allocated:size"] = "memory_allocated";
+	["memory.lua:size"] = "memory_lua";
+	["memory.returnable:size"] = "memory_returnable";
+	["memory.rss:size"] = "memory_rss";
+	["memory.total:size"] = "memory_total";
+	["memory.unused:size"] = "memory_unused";
+	["memory.used:size"] = "memory_used";
+	["/*/mod_c2s/connections:amount"] = "total_c2s";
+	["/*/mod_s2s/connections:amount"] = "total_s2s";
+};
+
+local function push_stat(conn, name, value)
+	local value_str = serialize(value);
+	name = name_map[name] or name;
+	return conn:write((("STAT %q (%s)\n"):format(name, value_str):gsub("\\\n", "\\n")));
+end
+
+local function push_stat_to_all(name, value)
+	for conn in pairs(sessions) do
+		push_stat(conn, name, value);
+	end
+end
+
+local session_stats_tpl = ([[{
+	message_in = %d, message_out = %d;
+	presence_in = %d, presence_out = %d;
+	iq_in = %d, iq_out = %d;
+	bytes_in = %d, bytes_out = %d;
+}]]):gsub("%s", "");
+
+
+local jid_fields = {
+	c2s = "full_jid";
+	s2sin = "from_host";
+	s2sout = "to_host";
+	component = "host";
+};
+
+local function push_session_to_all(session, stats)
+	local id = tostring(session):match("[a-f0-9]+$"); -- FIXME: Better id? :/
+	local stanzas_in, stanzas_out = stats.stanzas_in, stats.stanzas_out;
+	local s = (session_stats_tpl):format(
+		stanzas_in.message, stanzas_out.message,
+		stanzas_in.presence, stanzas_out.presence,
+		stanzas_in.iq, stanzas_out.iq,
+		stats.bytes_in, stats.bytes_out);
+	local jid = session[jid_fields[session.type]] or "";
+	for conn in pairs(sessions) do
+		conn:write(("SESS %q %q %s\n"):format(id, jid, s));
+	end
+end
+
+local active_sessions = {};
+
+-- Network listener
+local listener = {};
+
+function listener.onconnect(conn)
+	sessions[conn] = true;
+	push_stat(conn, "version", prosody.version);
+	push_stat(conn, "start_time", prosody.start_time);
+	push_stat(conn, "statistics_interval", statistics_interval);
+	push_stat(conn, "time", time_now());
+	local stats = statsman.get_stats();
+	for name, value in pairs(stats) do
+		push_stat(conn, name, value);
+	end
+	conn:write("\n"); -- Signal end of first batch (for non-streaming clients)
+end
+
+function listener.onincoming(conn, data)
+end
+
+function listener.ondisconnect(conn)
+	sessions[conn] = nil;
+end
+
+function listener.onreadtimeout()
+	return true;
+end
+
+local add_statistics_filter; -- forward decl
+if prosody and prosody.arg then -- ensures we aren't in prosodyctl
+	setmetatable(active_sessions, {
+		__index = function ( t, k )
+			local v = {
+				bytes_in = 0, bytes_out = 0;
+				stanzas_in = {
+					message = 0, presence = 0, iq = 0;
+				};
+				stanzas_out = {
+					message = 0, presence = 0, iq = 0;
+				};
+			}
+			rawset(t, k, v);
+			return v;
+		end
+	});
+	local filters = require "util.filters";
+	local function handle_stanza_in(stanza, session)
+		local s = active_sessions[session].stanzas_in;
+		local n = s[stanza.name];
+		if n then
+			s[stanza.name] = n + 1;
+		end
+		return stanza;
+	end
+	local function handle_stanza_out(stanza, session)
+		local s = active_sessions[session].stanzas_out;
+		local n = s[stanza.name];
+		if n then
+			s[stanza.name] = n + 1;
+		end
+		return stanza;
+	end
+	local function handle_bytes_in(bytes, session)
+		local s = active_sessions[session];
+		s.bytes_in = s.bytes_in + #bytes;
+		return bytes;
+	end
+	local function handle_bytes_out(bytes, session)
+		local s = active_sessions[session];
+		s.bytes_out = s.bytes_out + #bytes;
+		return bytes;
+	end
+	function add_statistics_filter(session)
+		filters.add_filter(session, "stanzas/in", handle_stanza_in);
+		filters.add_filter(session, "stanzas/out", handle_stanza_out);
+		filters.add_filter(session, "bytes/in", handle_bytes_in);
+		filters.add_filter(session, "bytes/out", handle_bytes_out);
+	end
+end
+
+
+function module.load()
+	if not(prosody and prosody.arg) then
+		return;
+	end
+	filters.add_filter_hook(add_statistics_filter);
+
+	module:add_timer(1, function ()
+		for session, session_stats in pairs(active_sessions) do
+			active_sessions[session] = nil;
+			push_session_to_all(session, session_stats);
+		end
+		return 1;
+	end);
+
+	module:hook("stats-updated", function (event)
+		local stats = event.changed_stats;
+		push_stat_to_all("time", time_now());
+		for name, value in pairs(stats) do
+			push_stat_to_all(name, value);
+		end
+	end);
+
+	module:hook("server-stopping", function ()
+		push_stat_to_all("stop_time", time_now());
+	end);
+end
+function module.unload()
+	filters.remove_filter_hook(add_statistics_filter);
+end
+
+if prosody and prosody.arg then
+	module:provides("net", {
+		default_port = 5782;
+		listener = listener;
+		private = true;
+	});
+end