Changeset

1072:4dbdb1b465e8

mod_statistics: Initial version, and a rough 'prosodyctl mod_statistics top'
author Matthew Wild <mwild1@gmail.com>
date Sat, 15 Jun 2013 19:08:34 +0100 (2013-06-15)
parents 1071:8f59b45fe6a7
children 1073:fe57e9332e52
files mod_statistics/mod_statistics.lua mod_statistics/prosodytop.lua mod_statistics/stats.lib.lua mod_statistics/top.lua
diffstat 4 files changed, 620 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_statistics/mod_statistics.lua	Sat Jun 15 19:08:34 2013 +0100
@@ -0,0 +1,134 @@
+module:set_global();
+
+local stats = module:require("mod_statistics/stats");
+local filters = require "util.filters";
+local serialize = require "util.serialization".serialize;
+
+local cached_values = {};
+
+local sessions = {};
+
+local function push_stat(conn, name, value)
+	local value_str = serialize(value);
+	return conn:write((("STAT %q (%s)\n"):format(name, value_str):gsub("\\\n", "\\n")));
+end
+
+local function push_stat_to_all(name, value)
+	for conn in pairs(sessions) do
+		push_stat(conn, name, value);
+	end
+end
+
+local session_stats_tpl = ([[{
+	message_in = %d, message_out = %d;
+	presence_in = %d, presence_out = %d;
+	iq_in = %d, iq_out = %d;
+	bytes_in = %d, bytes_out = %d;
+}]]):gsub("%s", "");
+
+
+local jid_fields = {
+	c2s = "full_jid";
+	s2sin = "from_host";
+	s2sout = "to_host";
+	component = "host";
+};
+
+local function push_session_to_all(session, stats)
+	local id = tostring(session):match("[a-f0-9]+$"); -- FIXME: Better id? :/
+	local stanzas_in, stanzas_out = stats.stanzas_in, stats.stanzas_out;
+	local s = (session_stats_tpl):format(
+		stanzas_in.message, stanzas_out.message,
+		stanzas_in.presence, stanzas_out.presence,
+		stanzas_in.iq, stanzas_out.iq,
+		stats.bytes_in, stats.bytes_out);
+	local jid = session[jid_fields[session.type]] or "";
+	for conn in pairs(sessions) do
+		return conn:write(("SESS %q %q %s\n"):format(id, jid, s));
+	end
+end
+
+local available_stats = stats.stats;
+local active_sessions = stats.active_sessions;
+
+-- Handle statistics provided by other modules
+local function item_handlers(host)
+	host = host and (host.."/") or "";
+	
+	return function (event) -- Added
+		local stats = event.item.statistics;
+		local group = host..(stats.name and (stats.name.."::") or "");
+		for name, stat in pairs(stats) do
+			available_stats[group..name] = stat;
+		end
+	end, function (event) -- Removed
+		local stats = event.item.statistics;
+		local group = host..(stats.name and (stats.name.."::") or "");
+		for name, stat in pairs(stats) do
+			available_stats[group..name] = nil;
+		end
+	end;
+end
+
+module:handle_items("statistics-provider", item_handlers());
+function module.add_host(module)
+	module:handle_items("statistics-provider", item_handlers(module.host));
+end
+
+-- Network listener
+local listener = {};
+
+function listener.onconnect(conn)
+	sessions[conn] = {};
+	push_stat(conn, "version", prosody.version);
+	for name, value in pairs(cached_values) do
+		push_stat(conn, name, value);
+	end
+	conn:write("\n"); -- Signal end of first batch (for non-streaming clients)
+end
+
+function listener.onincoming(conn, data)
+end
+
+function listener.ondisconnect(conn)
+	sessions[conn] = nil;
+end
+
+function module.load()
+	if not(prosody and prosody.full_sessions) then return; end --FIXME: hack, need a proper flag
+	filters.add_filter_hook(stats.filter_hook);
+
+	module:add_timer(1, function ()
+		for stat_name, stat in pairs(available_stats) do
+			if stat.get then
+				local cached = cached_values[stat_name];
+				local new_value = stat.get();
+				if new_value ~= cached then
+					push_stat_to_all(stat_name, new_value);
+					cached_values[stat_name] = new_value;
+				end
+			end
+		end
+		for session, session_stats in pairs(active_sessions) do
+			active_sessions[session] = nil;
+			push_session_to_all(session, session_stats);
+		end
+		return 1;
+	end);
+	module:provides("net", {
+		default_port = 5782;
+		listener = listener;
+	});
+end
+function module.unload()
+	filters.remove_filter_hook(stats.filter_hook);
+end
+function module.command( args )
+	local command = args[1];
+	if command == "top" then
+		local dir = module:get_directory();
+		package.path = dir.."/?.lua;"..dir.."/?.lib.lua;"..package.path;
+		local prosodytop = require "prosodytop";
+		prosodytop.run();
+	end
+end
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_statistics/prosodytop.lua	Sat Jun 15 19:08:34 2013 +0100
@@ -0,0 +1,148 @@
+local curses = require "curses";
+local server = require "net.server_select";
+local timer = require "util.timer";
+
+assert(curses.timeout, "Incorrect version of curses library. Try 'sudo luarocks install luaposix'");
+
+local top = require "top";
+
+function main()
+	local stdscr = curses.stdscr()  -- it's a userdatum
+	--stdscr:clear();
+	local view = top.new({
+		stdscr = stdscr;
+		prosody = { up_since = os.time() };
+		conn_list = {};
+	});
+	
+	timer.add_task(0.01, function ()
+		local ch = stdscr:getch();
+		if ch then
+			if stdscr:getch() == 410 then
+				view:resized();
+			else
+				curses.ungetch(ch);
+			end
+		end
+		return 0.2;
+	end);
+	
+	timer.add_task(0, function ()
+		view:draw();
+		return 1;
+	end);
+	
+	--[[
+	posix.signal(28, function ()
+		table.insert(view.conn_list, { jid = "WINCH" });
+		--view:draw();
+	end);
+	]]
+	
+	-- Fake socket object around stdin
+	local stdin = {
+        	getfd = function () return 0; end;
+        	dirty = function (self) return false; end;
+        	settimeout = function () end;
+        	send = function (_, d) return #d, 0; end;
+        	close = function () end;
+        	receive = function (_, patt)
+        		local ch = stdscr:getch();
+        		if ch >= 0 and ch <=255 then
+        			return string.char(ch);
+        		elseif ch == 410 then
+        			view:resized();
+        		else
+        			table.insert(view.conn_list, { jid = tostring(ch) }); --FIXME
+        		end
+        		return "";
+        	end
+	};
+	local function on_incoming(stdin, text)
+		-- TODO: Handle keypresses
+		if text:lower() == "q" then os.exit(); end
+	end
+	stdin = server.wrapclient(stdin, "stdin", 0, {
+		onincoming = on_incoming, ondisconnect = function () end
+	}, "*a");
+	
+	local function handle_line(line)
+		local e = {
+			STAT = function (name) return function (value)
+				view:update_stat(name, value);
+			end end;
+			SESS = function (id) return function (jid) return function (stats)
+				view:update_session(id, jid, stats);
+			end end end;
+		};
+		local chunk = assert(loadstring(line));
+		setfenv(chunk, e);
+		chunk();
+	end
+	
+	local stats_listener = {};
+	
+	function stats_listener.onconnect(conn)
+		--stdscr:mvaddstr(6, 0, "CONNECTED");
+	end
+	
+	local partial;
+	function stats_listener.onincoming(conn, data)
+		--print("DATA", data)
+		if partial then
+			partial, data = nil, partial..data;
+		end
+		if not data:match("\n") then
+			partial = data;
+			return;
+		end
+		local lastpos = 1;
+		for line, pos in data:gmatch("([^\n]+)\n()") do
+			lastpos = pos;
+			handle_line(line);
+		end
+		if lastpos == #data then
+			partial = nil;
+		else
+			partial = data:sub(lastpos+1);
+		end
+	end
+	
+	function stats_listener.ondisconnect(conn, err)
+		stdscr:mvaddstr(6, 0, "DISCONNECTED: "..(err or "unknown"));
+	end
+	
+	local conn = require "socket".tcp();
+	assert(conn:connect("localhost", 5782));
+	handler = server.wrapclient(conn, "localhost", 5279, stats_listener, "*a");
+end
+
+return {
+	run = function ()
+		--os.setlocale("UTF-8", "all")
+
+		curses.initscr()
+		curses.cbreak()
+		curses.echo(false)  -- not noecho !
+		curses.nl(false)    -- not nonl !
+		curses.timeout(0);
+
+		local ok, err = pcall(main);
+
+		--while true do stdscr:getch() end
+		--stdscr:endwin()
+
+		if ok then
+			ok, err = xpcall(server.loop, debug.traceback);
+		end
+
+		curses.endwin();
+
+		--stdscr:refresh();
+		if not ok then
+			print(err);
+		end
+
+		print"DONE"
+	end;
+};
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_statistics/stats.lib.lua	Sat Jun 15 19:08:34 2013 +0100
@@ -0,0 +1,129 @@
+local it = require "util.iterators";
+local log = require "util.logger".init("stats");
+
+local last_cpu_wall, last_cpu_clock;
+local get_time = require "socket".gettime;
+
+local active_sessions, active_jids = {}, {};
+
+local stats = {
+	total_users = {
+		get = function () return it.count(it.keys(bare_sessions)); end
+	};
+	total_c2s = {
+		get = function () return it.count(it.keys(full_sessions)); end
+	};
+	total_s2sin = {
+		get = function () return it.count(it.keys(prosody.incoming_s2s)); end
+	};
+	total_s2sout = {
+		get = function ()
+			local count = 0;
+			for host, host_session in pairs(hosts) do
+				count = count + it.count(it.keys(host_session.s2sout));
+			end
+			return count;
+		end
+	};
+	total_component = {
+		get = function ()
+			local count = 0;
+			for host, host_session in pairs(hosts) do
+				if host_session.type == "component" then
+					local c = host_session.modules.component;
+					if c and c.connected then -- 0.9 only
+						count = count + 1;
+					end
+				end
+			end
+			return count;
+		end
+	};
+	up_since = {
+		get = function () return prosody.start_time; end;
+		tostring = function (up_since)
+			return tostring(os.time()-up_since).."s";
+		end;
+	};
+	memory_used = {
+		get = function () return collectgarbage("count")/1024; end;
+		tostring = "%0.2fMB";
+	};
+	memory_process = {
+		get = function () return pposix.meminfo().allocated/1048576; end;
+		tostring = "%0.2fMB";
+	};
+	time = {
+		tostring = function () return os.date("%T"); end;
+	};
+	cpu = {
+		get = function ()
+			local new_wall, new_clock = get_time(), os.clock();
+			local pc = 0;
+			if last_cpu_wall then
+				pc = 100/((new_wall-last_cpu_wall)/(new_clock-last_cpu_clock));
+			end
+			last_cpu_wall, last_cpu_clock = new_wall, new_clock;
+			return math.ceil(pc);
+		end;
+		tostring = "%s%%";
+	};
+};
+
+local add_statistics_filter; -- forward decl
+if prosody and prosody.full_sessions then -- start_time ensures we aren't in prosodyctl
+	setmetatable(active_sessions, {
+		__index = function ( t, k )
+			local v = {
+				bytes_in = 0, bytes_out = 0;
+				stanzas_in = {
+					message = 0, presence = 0, iq = 0;
+				};
+				stanzas_out = {
+					message = 0, presence = 0, iq = 0;
+				};
+			}
+			rawset(t, k, v);
+			return v;
+		end
+	});
+	local filters = require "util.filters";
+	local function handle_stanza_in(stanza, session)
+		local s = active_sessions[session].stanzas_in;
+		local n = s[stanza.name];
+		if n then
+			s[stanza.name] = n + 1;
+		end
+		return stanza;
+	end
+	local function handle_stanza_out(stanza, session)
+		local s = active_sessions[session].stanzas_out;
+		local n = s[stanza.name];
+		if n then
+			s[stanza.name] = n + 1;
+		end
+		return stanza;
+	end
+	local function handle_bytes_in(bytes, session)
+		local s = active_sessions[session];
+		s.bytes_in = s.bytes_in + #bytes;
+		return bytes;
+	end
+	local function handle_bytes_out(bytes, session)
+		local s = active_sessions[session];
+		s.bytes_out = s.bytes_out + #bytes;
+		return bytes;
+	end
+	function add_statistics_filter(session)
+		filters.add_filter(session, "stanzas/in", handle_stanza_in);
+		filters.add_filter(session, "stanzas/out", handle_stanza_out);
+		filters.add_filter(session, "bytes/in", handle_bytes_in);
+		filters.add_filter(session, "bytes/out", handle_bytes_out);
+	end
+end
+
+return {
+	stats = stats;
+	active_sessions = active_sessions;
+	filter_hook = add_statistics_filter;
+};
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_statistics/top.lua	Sat Jun 15 19:08:34 2013 +0100
@@ -0,0 +1,209 @@
+module("prosodytop", package.seeall);
+
+local array = require "util.array";
+local it = require "util.iterators";
+local curses = require "curses";
+local stats = require "stats".stats;
+local time = require "socket".gettime;
+
+local sessions_idle_after = 3;
+local stanza_names = {"message", "presence", "iq"};
+
+local top = {};
+top.__index = top;
+
+local status_lines = {
+	"Prosody $version - $time up $up_since, $total_users users, $cpu busy";
+	"Connections: $total_c2s c2s, $total_s2sout s2sout, $total_s2sin s2sin, $total_component component";
+	"Memory: $memory_used lua, $memory_process process"; 
+	"Stanzas in: $message_in_per_second message/s, $presence_in_per_second presence/s, $iq_in_per_second iq/s";
+	"Stanzas out: $message_out_per_second message/s, $presence_out_per_second presence/s, $iq_out_per_second iq/s";
+};
+
+function top:draw()
+	self:draw_status();
+	self:draw_column_titles();
+	self:draw_conn_list();
+	self.statuswin:refresh();
+	self.listwin:refresh();
+	--self.infowin:refresh()
+	self.stdscr:move(#status_lines,0)
+end
+
+-- Width specified as cols or % of unused space, defaults to
+-- title width if not specified
+local conn_list_columns = {
+	{ title = "ID", key = "id", width = "8" };
+	{ title = "JID", key = "jid", width = "100%" };
+	{ title = "STANZAS IN>", key = "total_stanzas_in", align = "right" };
+	{ title = "MSG", key = "message_in", align = "right", width = "4" };
+	{ title = "PRES", key = "presence_in", align = "right", width = "4" };
+	{ title = "IQ", key = "iq_in", align = "right", width = "4" };
+	{ title = "STANZAS OUT>", key = "total_stanzas_out", align = "right" };
+	{ title = "MSG", key = "message_out", align = "right", width = "4" };
+	{ title = "PRES", key = "presence_out", align = "right", width = "4" };
+	{ title = "IQ", key = "iq_out", align = "right", width = "4" };
+	{ title = "BYTES IN", key = "bytes_in", align = "right" };
+	{ title = "BYTES OUT", key = "bytes_out", align = "right" };
+
+};
+
+function top:draw_status()
+	for row, line in ipairs(status_lines) do
+		self.statuswin:mvaddstr(row-1, 0, (line:gsub("%$([%w_]+)", self.data)));
+		self.statuswin:clrtoeol();
+	end
+	-- Clear stanza counts
+	for _, stanza_type in ipairs(stanza_names) do
+		self.prosody[stanza_type.."_in_per_second"] = 0;
+		self.prosody[stanza_type.."_out_per_second"] = 0;
+	end
+end
+
+local function padright(s, width)
+	return s..string.rep(" ", width-#s);
+end
+
+local function padleft(s, width)
+	return string.rep(" ", width-#s)..s;
+end
+
+function top:resized()
+	self:recalc_column_widths();
+	--self.stdscr:clear();
+	self:draw();
+end
+
+function top:recalc_column_widths()
+	local widths = {};
+	self.column_widths = widths;
+	local total_width = curses.cols()-4;
+	local free_width = total_width;
+	for i = 1, #conn_list_columns do
+		local width = conn_list_columns[i].width or "0";
+		if not(type(width) == "string" and width:sub(-1) == "%") then
+			width = math.max(tonumber(width), #conn_list_columns[i].title+1);
+			widths[i] = width;
+			free_width = free_width - width;
+		end
+	end
+	for i = 1, #conn_list_columns do
+		if not widths[i] then
+			local pc_width = tonumber((conn_list_columns[i].width:gsub("%%$", "")));
+			widths[i] = math.floor(free_width*(pc_width/100));
+		end
+	end
+	return widths;
+end
+
+function top:draw_column_titles()
+	local widths = self.column_widths;
+	self.listwin:attron(curses.A_REVERSE);
+	self.listwin:mvaddstr(0, 0, "  ");
+	for i, column in ipairs(conn_list_columns) do
+		self.listwin:addstr(padright(column.title, widths[i]));
+	end
+	self.listwin:addstr("  ");
+	self.listwin:attroff(curses.A_REVERSE);
+end
+
+local function session_compare(session1, session2)
+	local stats1, stats2 = session1.stats, session2.stats;
+	return (stats1.total_stanzas_in + stats1.total_stanzas_out) >= 
+		(stats2.total_stanzas_in + stats2.total_stanzas_out);
+end
+
+function top:draw_conn_list()
+	local rows = curses.lines()-(#status_lines+2)-5;
+	local cutoff_time = time() - sessions_idle_after;
+	local widths = self.column_widths;
+	local top_sessions = array.collect(it.values(self.active_sessions)):sort(session_compare);
+	for index = 1, rows do
+		session = top_sessions[index];
+		if session then
+			if session.last_update < cutoff_time then
+				self.active_sessions[session.id] = nil;
+			else
+				local row = {};
+				for i, column in ipairs(conn_list_columns) do
+					local width = widths[i];
+					local v = tostring(session[column.key] or ""):sub(1, width);
+					if #v < width then
+						if column.align == "right" then
+							v = padleft(v, width-1).." ";
+						else
+							v = padright(v, width);
+						end
+					end
+					table.insert(row, v);
+				end
+				self.listwin:mvaddstr(index, 0, "  "..table.concat(row));
+			end
+		else
+			-- FIXME: How to clear a line? It's 5am and I don't feel like reading docs.
+			self.listwin:move(index, 0);
+			self.listwin:clrtoeol();
+		end
+	end
+end
+
+function top:update_stat(name, value)
+	self.prosody[name] = value;
+end
+
+function top:update_session(id, jid, stats)
+	self.active_sessions[id] = stats;
+	stats.id, stats.jid, stats.stats = id, jid, stats;
+	stats.total_bytes = stats.bytes_in + stats.bytes_out;
+	for _, stanza_type in ipairs(stanza_names) do
+		self.prosody[stanza_type.."_in_per_second"] = (self.prosody[stanza_type.."_in_per_second"] or 0) + stats[stanza_type.."_in"];
+		self.prosody[stanza_type.."_out_per_second"] = (self.prosody[stanza_type.."_out_per_second"] or 0) + stats[stanza_type.."_out"];
+	end
+	stats.total_stanzas_in = stats.message_in + stats.presence_in + stats.iq_in;
+	stats.total_stanzas_out = stats.message_out + stats.presence_out + stats.iq_out;
+	stats.last_update = time();
+end
+
+function new(base)
+	setmetatable(base, top);
+	base.data = setmetatable({}, {
+		__index = function (t, k)
+			local stat = stats[k];
+			if stat and stat.tostring then
+				if type(stat.tostring) == "function" then
+					return stat.tostring(base.prosody[k]);
+				elseif type(stat.tostring) == "string" then
+					local v = base.prosody[k];
+					if v == nil then
+						return "?";
+					end
+					return (stat.tostring):format(v);
+				end
+			end
+			return base.prosody[k];
+		end;
+	});
+
+	base.active_sessions = {};
+
+	base.statuswin = curses.newwin(#status_lines, 0, 0, 0);
+
+	base.promptwin = curses.newwin(1, 0, #status_lines, 0);
+	base.promptwin:addstr("");
+	base.promptwin:refresh();
+
+	base.listwin = curses.newwin(curses.lines()-(#status_lines+2)-5, 0, #status_lines+1, 0);
+	base.listwin:syncok();
+	
+	base.infowin = curses.newwin(5, 0, curses.lines()-5, 0);
+	base.infowin:mvaddstr(1, 1, "Hello world");
+	base.infowin:border(0,0,0,0);
+	base.infowin:syncok();
+	base.infowin:refresh();
+
+	base:resized();
+	
+	return base;
+end
+
+return _M;