# HG changeset patch # User Matthew Wild # Date 1371319714 -3600 # Node ID 4dbdb1b465e8402d1af1b4bff1292d0fd926bf87 # Parent 8f59b45fe6a77a1150cfcbdb64403cc5949985af mod_statistics: Initial version, and a rough 'prosodyctl mod_statistics top' diff -r 8f59b45fe6a7 -r 4dbdb1b465e8 mod_statistics/mod_statistics.lua --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mod_statistics/mod_statistics.lua Sat Jun 15 19:08:34 2013 +0100 @@ -0,0 +1,134 @@ +module:set_global(); + +local stats = module:require("mod_statistics/stats"); +local filters = require "util.filters"; +local serialize = require "util.serialization".serialize; + +local cached_values = {}; + +local sessions = {}; + +local function push_stat(conn, name, value) + local value_str = serialize(value); + return conn:write((("STAT %q (%s)\n"):format(name, value_str):gsub("\\\n", "\\n"))); +end + +local function push_stat_to_all(name, value) + for conn in pairs(sessions) do + push_stat(conn, name, value); + end +end + +local session_stats_tpl = ([[{ + message_in = %d, message_out = %d; + presence_in = %d, presence_out = %d; + iq_in = %d, iq_out = %d; + bytes_in = %d, bytes_out = %d; +}]]):gsub("%s", ""); + + +local jid_fields = { + c2s = "full_jid"; + s2sin = "from_host"; + s2sout = "to_host"; + component = "host"; +}; + +local function push_session_to_all(session, stats) + local id = tostring(session):match("[a-f0-9]+$"); -- FIXME: Better id? :/ + local stanzas_in, stanzas_out = stats.stanzas_in, stats.stanzas_out; + local s = (session_stats_tpl):format( + stanzas_in.message, stanzas_out.message, + stanzas_in.presence, stanzas_out.presence, + stanzas_in.iq, stanzas_out.iq, + stats.bytes_in, stats.bytes_out); + local jid = session[jid_fields[session.type]] or ""; + for conn in pairs(sessions) do + return conn:write(("SESS %q %q %s\n"):format(id, jid, s)); + end +end + +local available_stats = stats.stats; +local active_sessions = stats.active_sessions; + +-- Handle statistics provided by other modules +local function item_handlers(host) + host = host and (host.."/") or ""; + + return function (event) -- Added + local stats = event.item.statistics; + local group = host..(stats.name and (stats.name.."::") or ""); + for name, stat in pairs(stats) do + available_stats[group..name] = stat; + end + end, function (event) -- Removed + local stats = event.item.statistics; + local group = host..(stats.name and (stats.name.."::") or ""); + for name, stat in pairs(stats) do + available_stats[group..name] = nil; + end + end; +end + +module:handle_items("statistics-provider", item_handlers()); +function module.add_host(module) + module:handle_items("statistics-provider", item_handlers(module.host)); +end + +-- Network listener +local listener = {}; + +function listener.onconnect(conn) + sessions[conn] = {}; + push_stat(conn, "version", prosody.version); + for name, value in pairs(cached_values) do + push_stat(conn, name, value); + end + conn:write("\n"); -- Signal end of first batch (for non-streaming clients) +end + +function listener.onincoming(conn, data) +end + +function listener.ondisconnect(conn) + sessions[conn] = nil; +end + +function module.load() + if not(prosody and prosody.full_sessions) then return; end --FIXME: hack, need a proper flag + filters.add_filter_hook(stats.filter_hook); + + module:add_timer(1, function () + for stat_name, stat in pairs(available_stats) do + if stat.get then + local cached = cached_values[stat_name]; + local new_value = stat.get(); + if new_value ~= cached then + push_stat_to_all(stat_name, new_value); + cached_values[stat_name] = new_value; + end + end + end + for session, session_stats in pairs(active_sessions) do + active_sessions[session] = nil; + push_session_to_all(session, session_stats); + end + return 1; + end); + module:provides("net", { + default_port = 5782; + listener = listener; + }); +end +function module.unload() + filters.remove_filter_hook(stats.filter_hook); +end +function module.command( args ) + local command = args[1]; + if command == "top" then + local dir = module:get_directory(); + package.path = dir.."/?.lua;"..dir.."/?.lib.lua;"..package.path; + local prosodytop = require "prosodytop"; + prosodytop.run(); + end +end diff -r 8f59b45fe6a7 -r 4dbdb1b465e8 mod_statistics/prosodytop.lua --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mod_statistics/prosodytop.lua Sat Jun 15 19:08:34 2013 +0100 @@ -0,0 +1,148 @@ +local curses = require "curses"; +local server = require "net.server_select"; +local timer = require "util.timer"; + +assert(curses.timeout, "Incorrect version of curses library. Try 'sudo luarocks install luaposix'"); + +local top = require "top"; + +function main() + local stdscr = curses.stdscr() -- it's a userdatum + --stdscr:clear(); + local view = top.new({ + stdscr = stdscr; + prosody = { up_since = os.time() }; + conn_list = {}; + }); + + timer.add_task(0.01, function () + local ch = stdscr:getch(); + if ch then + if stdscr:getch() == 410 then + view:resized(); + else + curses.ungetch(ch); + end + end + return 0.2; + end); + + timer.add_task(0, function () + view:draw(); + return 1; + end); + + --[[ + posix.signal(28, function () + table.insert(view.conn_list, { jid = "WINCH" }); + --view:draw(); + end); + ]] + + -- Fake socket object around stdin + local stdin = { + getfd = function () return 0; end; + dirty = function (self) return false; end; + settimeout = function () end; + send = function (_, d) return #d, 0; end; + close = function () end; + receive = function (_, patt) + local ch = stdscr:getch(); + if ch >= 0 and ch <=255 then + return string.char(ch); + elseif ch == 410 then + view:resized(); + else + table.insert(view.conn_list, { jid = tostring(ch) }); --FIXME + end + return ""; + end + }; + local function on_incoming(stdin, text) + -- TODO: Handle keypresses + if text:lower() == "q" then os.exit(); end + end + stdin = server.wrapclient(stdin, "stdin", 0, { + onincoming = on_incoming, ondisconnect = function () end + }, "*a"); + + local function handle_line(line) + local e = { + STAT = function (name) return function (value) + view:update_stat(name, value); + end end; + SESS = function (id) return function (jid) return function (stats) + view:update_session(id, jid, stats); + end end end; + }; + local chunk = assert(loadstring(line)); + setfenv(chunk, e); + chunk(); + end + + local stats_listener = {}; + + function stats_listener.onconnect(conn) + --stdscr:mvaddstr(6, 0, "CONNECTED"); + end + + local partial; + function stats_listener.onincoming(conn, data) + --print("DATA", data) + if partial then + partial, data = nil, partial..data; + end + if not data:match("\n") then + partial = data; + return; + end + local lastpos = 1; + for line, pos in data:gmatch("([^\n]+)\n()") do + lastpos = pos; + handle_line(line); + end + if lastpos == #data then + partial = nil; + else + partial = data:sub(lastpos+1); + end + end + + function stats_listener.ondisconnect(conn, err) + stdscr:mvaddstr(6, 0, "DISCONNECTED: "..(err or "unknown")); + end + + local conn = require "socket".tcp(); + assert(conn:connect("localhost", 5782)); + handler = server.wrapclient(conn, "localhost", 5279, stats_listener, "*a"); +end + +return { + run = function () + --os.setlocale("UTF-8", "all") + + curses.initscr() + curses.cbreak() + curses.echo(false) -- not noecho ! + curses.nl(false) -- not nonl ! + curses.timeout(0); + + local ok, err = pcall(main); + + --while true do stdscr:getch() end + --stdscr:endwin() + + if ok then + ok, err = xpcall(server.loop, debug.traceback); + end + + curses.endwin(); + + --stdscr:refresh(); + if not ok then + print(err); + end + + print"DONE" + end; +}; diff -r 8f59b45fe6a7 -r 4dbdb1b465e8 mod_statistics/stats.lib.lua --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mod_statistics/stats.lib.lua Sat Jun 15 19:08:34 2013 +0100 @@ -0,0 +1,129 @@ +local it = require "util.iterators"; +local log = require "util.logger".init("stats"); + +local last_cpu_wall, last_cpu_clock; +local get_time = require "socket".gettime; + +local active_sessions, active_jids = {}, {}; + +local stats = { + total_users = { + get = function () return it.count(it.keys(bare_sessions)); end + }; + total_c2s = { + get = function () return it.count(it.keys(full_sessions)); end + }; + total_s2sin = { + get = function () return it.count(it.keys(prosody.incoming_s2s)); end + }; + total_s2sout = { + get = function () + local count = 0; + for host, host_session in pairs(hosts) do + count = count + it.count(it.keys(host_session.s2sout)); + end + return count; + end + }; + total_component = { + get = function () + local count = 0; + for host, host_session in pairs(hosts) do + if host_session.type == "component" then + local c = host_session.modules.component; + if c and c.connected then -- 0.9 only + count = count + 1; + end + end + end + return count; + end + }; + up_since = { + get = function () return prosody.start_time; end; + tostring = function (up_since) + return tostring(os.time()-up_since).."s"; + end; + }; + memory_used = { + get = function () return collectgarbage("count")/1024; end; + tostring = "%0.2fMB"; + }; + memory_process = { + get = function () return pposix.meminfo().allocated/1048576; end; + tostring = "%0.2fMB"; + }; + time = { + tostring = function () return os.date("%T"); end; + }; + cpu = { + get = function () + local new_wall, new_clock = get_time(), os.clock(); + local pc = 0; + if last_cpu_wall then + pc = 100/((new_wall-last_cpu_wall)/(new_clock-last_cpu_clock)); + end + last_cpu_wall, last_cpu_clock = new_wall, new_clock; + return math.ceil(pc); + end; + tostring = "%s%%"; + }; +}; + +local add_statistics_filter; -- forward decl +if prosody and prosody.full_sessions then -- start_time ensures we aren't in prosodyctl + setmetatable(active_sessions, { + __index = function ( t, k ) + local v = { + bytes_in = 0, bytes_out = 0; + stanzas_in = { + message = 0, presence = 0, iq = 0; + }; + stanzas_out = { + message = 0, presence = 0, iq = 0; + }; + } + rawset(t, k, v); + return v; + end + }); + local filters = require "util.filters"; + local function handle_stanza_in(stanza, session) + local s = active_sessions[session].stanzas_in; + local n = s[stanza.name]; + if n then + s[stanza.name] = n + 1; + end + return stanza; + end + local function handle_stanza_out(stanza, session) + local s = active_sessions[session].stanzas_out; + local n = s[stanza.name]; + if n then + s[stanza.name] = n + 1; + end + return stanza; + end + local function handle_bytes_in(bytes, session) + local s = active_sessions[session]; + s.bytes_in = s.bytes_in + #bytes; + return bytes; + end + local function handle_bytes_out(bytes, session) + local s = active_sessions[session]; + s.bytes_out = s.bytes_out + #bytes; + return bytes; + end + function add_statistics_filter(session) + filters.add_filter(session, "stanzas/in", handle_stanza_in); + filters.add_filter(session, "stanzas/out", handle_stanza_out); + filters.add_filter(session, "bytes/in", handle_bytes_in); + filters.add_filter(session, "bytes/out", handle_bytes_out); + end +end + +return { + stats = stats; + active_sessions = active_sessions; + filter_hook = add_statistics_filter; +}; diff -r 8f59b45fe6a7 -r 4dbdb1b465e8 mod_statistics/top.lua --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mod_statistics/top.lua Sat Jun 15 19:08:34 2013 +0100 @@ -0,0 +1,209 @@ +module("prosodytop", package.seeall); + +local array = require "util.array"; +local it = require "util.iterators"; +local curses = require "curses"; +local stats = require "stats".stats; +local time = require "socket".gettime; + +local sessions_idle_after = 3; +local stanza_names = {"message", "presence", "iq"}; + +local top = {}; +top.__index = top; + +local status_lines = { + "Prosody $version - $time up $up_since, $total_users users, $cpu busy"; + "Connections: $total_c2s c2s, $total_s2sout s2sout, $total_s2sin s2sin, $total_component component"; + "Memory: $memory_used lua, $memory_process process"; + "Stanzas in: $message_in_per_second message/s, $presence_in_per_second presence/s, $iq_in_per_second iq/s"; + "Stanzas out: $message_out_per_second message/s, $presence_out_per_second presence/s, $iq_out_per_second iq/s"; +}; + +function top:draw() + self:draw_status(); + self:draw_column_titles(); + self:draw_conn_list(); + self.statuswin:refresh(); + self.listwin:refresh(); + --self.infowin:refresh() + self.stdscr:move(#status_lines,0) +end + +-- Width specified as cols or % of unused space, defaults to +-- title width if not specified +local conn_list_columns = { + { title = "ID", key = "id", width = "8" }; + { title = "JID", key = "jid", width = "100%" }; + { title = "STANZAS IN>", key = "total_stanzas_in", align = "right" }; + { title = "MSG", key = "message_in", align = "right", width = "4" }; + { title = "PRES", key = "presence_in", align = "right", width = "4" }; + { title = "IQ", key = "iq_in", align = "right", width = "4" }; + { title = "STANZAS OUT>", key = "total_stanzas_out", align = "right" }; + { title = "MSG", key = "message_out", align = "right", width = "4" }; + { title = "PRES", key = "presence_out", align = "right", width = "4" }; + { title = "IQ", key = "iq_out", align = "right", width = "4" }; + { title = "BYTES IN", key = "bytes_in", align = "right" }; + { title = "BYTES OUT", key = "bytes_out", align = "right" }; + +}; + +function top:draw_status() + for row, line in ipairs(status_lines) do + self.statuswin:mvaddstr(row-1, 0, (line:gsub("%$([%w_]+)", self.data))); + self.statuswin:clrtoeol(); + end + -- Clear stanza counts + for _, stanza_type in ipairs(stanza_names) do + self.prosody[stanza_type.."_in_per_second"] = 0; + self.prosody[stanza_type.."_out_per_second"] = 0; + end +end + +local function padright(s, width) + return s..string.rep(" ", width-#s); +end + +local function padleft(s, width) + return string.rep(" ", width-#s)..s; +end + +function top:resized() + self:recalc_column_widths(); + --self.stdscr:clear(); + self:draw(); +end + +function top:recalc_column_widths() + local widths = {}; + self.column_widths = widths; + local total_width = curses.cols()-4; + local free_width = total_width; + for i = 1, #conn_list_columns do + local width = conn_list_columns[i].width or "0"; + if not(type(width) == "string" and width:sub(-1) == "%") then + width = math.max(tonumber(width), #conn_list_columns[i].title+1); + widths[i] = width; + free_width = free_width - width; + end + end + for i = 1, #conn_list_columns do + if not widths[i] then + local pc_width = tonumber((conn_list_columns[i].width:gsub("%%$", ""))); + widths[i] = math.floor(free_width*(pc_width/100)); + end + end + return widths; +end + +function top:draw_column_titles() + local widths = self.column_widths; + self.listwin:attron(curses.A_REVERSE); + self.listwin:mvaddstr(0, 0, " "); + for i, column in ipairs(conn_list_columns) do + self.listwin:addstr(padright(column.title, widths[i])); + end + self.listwin:addstr(" "); + self.listwin:attroff(curses.A_REVERSE); +end + +local function session_compare(session1, session2) + local stats1, stats2 = session1.stats, session2.stats; + return (stats1.total_stanzas_in + stats1.total_stanzas_out) >= + (stats2.total_stanzas_in + stats2.total_stanzas_out); +end + +function top:draw_conn_list() + local rows = curses.lines()-(#status_lines+2)-5; + local cutoff_time = time() - sessions_idle_after; + local widths = self.column_widths; + local top_sessions = array.collect(it.values(self.active_sessions)):sort(session_compare); + for index = 1, rows do + session = top_sessions[index]; + if session then + if session.last_update < cutoff_time then + self.active_sessions[session.id] = nil; + else + local row = {}; + for i, column in ipairs(conn_list_columns) do + local width = widths[i]; + local v = tostring(session[column.key] or ""):sub(1, width); + if #v < width then + if column.align == "right" then + v = padleft(v, width-1).." "; + else + v = padright(v, width); + end + end + table.insert(row, v); + end + self.listwin:mvaddstr(index, 0, " "..table.concat(row)); + end + else + -- FIXME: How to clear a line? It's 5am and I don't feel like reading docs. + self.listwin:move(index, 0); + self.listwin:clrtoeol(); + end + end +end + +function top:update_stat(name, value) + self.prosody[name] = value; +end + +function top:update_session(id, jid, stats) + self.active_sessions[id] = stats; + stats.id, stats.jid, stats.stats = id, jid, stats; + stats.total_bytes = stats.bytes_in + stats.bytes_out; + for _, stanza_type in ipairs(stanza_names) do + self.prosody[stanza_type.."_in_per_second"] = (self.prosody[stanza_type.."_in_per_second"] or 0) + stats[stanza_type.."_in"]; + self.prosody[stanza_type.."_out_per_second"] = (self.prosody[stanza_type.."_out_per_second"] or 0) + stats[stanza_type.."_out"]; + end + stats.total_stanzas_in = stats.message_in + stats.presence_in + stats.iq_in; + stats.total_stanzas_out = stats.message_out + stats.presence_out + stats.iq_out; + stats.last_update = time(); +end + +function new(base) + setmetatable(base, top); + base.data = setmetatable({}, { + __index = function (t, k) + local stat = stats[k]; + if stat and stat.tostring then + if type(stat.tostring) == "function" then + return stat.tostring(base.prosody[k]); + elseif type(stat.tostring) == "string" then + local v = base.prosody[k]; + if v == nil then + return "?"; + end + return (stat.tostring):format(v); + end + end + return base.prosody[k]; + end; + }); + + base.active_sessions = {}; + + base.statuswin = curses.newwin(#status_lines, 0, 0, 0); + + base.promptwin = curses.newwin(1, 0, #status_lines, 0); + base.promptwin:addstr(""); + base.promptwin:refresh(); + + base.listwin = curses.newwin(curses.lines()-(#status_lines+2)-5, 0, #status_lines+1, 0); + base.listwin:syncok(); + + base.infowin = curses.newwin(5, 0, curses.lines()-5, 0); + base.infowin:mvaddstr(1, 1, "Hello world"); + base.infowin:border(0,0,0,0); + base.infowin:syncok(); + base.infowin:refresh(); + + base:resized(); + + return base; +end + +return _M;