Comparison

mod_statistics_statsman/mod_statistics_statsman.lua @ 3158:2558ece20e58

mod_statistics_statsman: Streaming access to statsmanager
author Kim Alvefur <zash@zash.se>
date Thu, 28 Jun 2018 21:42:48 +0200
child 3169:5d58bdbfe024
comparison
equal deleted inserted replaced
3157:8a870e0319db 3158:2558ece20e58
1 module:set_global();
2
3 local statsman = require "core.statsmanager";
4 local time_now = require "util.time".now;
5 local filters = require "util.filters";
6 local serialize = require "util.serialization".serialize;
7
8 local statistics_interval = module:context("*"):get_option_number("statistics_interval", 60);
9 if module:context("*"):get_option("statistics", "internal") ~= "internal" then
10 module:log("error", "Not using internal statistics, can't do anyting");
11 return;
12 end
13
14 local sessions = {};
15
16 local name_map = {
17 ["start_time"] = "up_since";
18 ["cpu.percent:amount"] = "cpu";
19 ["memory.allocated_mmap:size"] = "memory_allocated_mmap";
20 ["memory.allocated:size"] = "memory_allocated";
21 ["memory.lua:size"] = "memory_lua";
22 ["memory.returnable:size"] = "memory_returnable";
23 ["memory.rss:size"] = "memory_rss";
24 ["memory.total:size"] = "memory_total";
25 ["memory.unused:size"] = "memory_unused";
26 ["memory.used:size"] = "memory_used";
27 ["/*/mod_c2s/connections:amount"] = "total_c2s";
28 ["/*/mod_s2s/connections:amount"] = "total_s2s";
29 };
30
31 local function push_stat(conn, name, value)
32 local value_str = serialize(value);
33 name = name_map[name] or name;
34 return conn:write((("STAT %q (%s)\n"):format(name, value_str):gsub("\\\n", "\\n")));
35 end
36
37 local function push_stat_to_all(name, value)
38 for conn in pairs(sessions) do
39 push_stat(conn, name, value);
40 end
41 end
42
43 local session_stats_tpl = ([[{
44 message_in = %d, message_out = %d;
45 presence_in = %d, presence_out = %d;
46 iq_in = %d, iq_out = %d;
47 bytes_in = %d, bytes_out = %d;
48 }]]):gsub("%s", "");
49
50
51 local jid_fields = {
52 c2s = "full_jid";
53 s2sin = "from_host";
54 s2sout = "to_host";
55 component = "host";
56 };
57
58 local function push_session_to_all(session, stats)
59 local id = tostring(session):match("[a-f0-9]+$"); -- FIXME: Better id? :/
60 local stanzas_in, stanzas_out = stats.stanzas_in, stats.stanzas_out;
61 local s = (session_stats_tpl):format(
62 stanzas_in.message, stanzas_out.message,
63 stanzas_in.presence, stanzas_out.presence,
64 stanzas_in.iq, stanzas_out.iq,
65 stats.bytes_in, stats.bytes_out);
66 local jid = session[jid_fields[session.type]] or "";
67 for conn in pairs(sessions) do
68 conn:write(("SESS %q %q %s\n"):format(id, jid, s));
69 end
70 end
71
72 local active_sessions = {};
73
74 -- Network listener
75 local listener = {};
76
77 function listener.onconnect(conn)
78 sessions[conn] = true;
79 push_stat(conn, "version", prosody.version);
80 push_stat(conn, "start_time", prosody.start_time);
81 push_stat(conn, "statistics_interval", statistics_interval);
82 push_stat(conn, "time", time_now());
83 local stats = statsman.get_stats();
84 for name, value in pairs(stats) do
85 push_stat(conn, name, value);
86 end
87 conn:write("\n"); -- Signal end of first batch (for non-streaming clients)
88 end
89
90 function listener.onincoming(conn, data)
91 end
92
93 function listener.ondisconnect(conn)
94 sessions[conn] = nil;
95 end
96
97 function listener.onreadtimeout()
98 return true;
99 end
100
101 local add_statistics_filter; -- forward decl
102 if prosody and prosody.arg then -- ensures we aren't in prosodyctl
103 setmetatable(active_sessions, {
104 __index = function ( t, k )
105 local v = {
106 bytes_in = 0, bytes_out = 0;
107 stanzas_in = {
108 message = 0, presence = 0, iq = 0;
109 };
110 stanzas_out = {
111 message = 0, presence = 0, iq = 0;
112 };
113 }
114 rawset(t, k, v);
115 return v;
116 end
117 });
118 local filters = require "util.filters";
119 local function handle_stanza_in(stanza, session)
120 local s = active_sessions[session].stanzas_in;
121 local n = s[stanza.name];
122 if n then
123 s[stanza.name] = n + 1;
124 end
125 return stanza;
126 end
127 local function handle_stanza_out(stanza, session)
128 local s = active_sessions[session].stanzas_out;
129 local n = s[stanza.name];
130 if n then
131 s[stanza.name] = n + 1;
132 end
133 return stanza;
134 end
135 local function handle_bytes_in(bytes, session)
136 local s = active_sessions[session];
137 s.bytes_in = s.bytes_in + #bytes;
138 return bytes;
139 end
140 local function handle_bytes_out(bytes, session)
141 local s = active_sessions[session];
142 s.bytes_out = s.bytes_out + #bytes;
143 return bytes;
144 end
145 function add_statistics_filter(session)
146 filters.add_filter(session, "stanzas/in", handle_stanza_in);
147 filters.add_filter(session, "stanzas/out", handle_stanza_out);
148 filters.add_filter(session, "bytes/in", handle_bytes_in);
149 filters.add_filter(session, "bytes/out", handle_bytes_out);
150 end
151 end
152
153
154 function module.load()
155 if not(prosody and prosody.arg) then
156 return;
157 end
158 filters.add_filter_hook(add_statistics_filter);
159
160 module:add_timer(1, function ()
161 for session, session_stats in pairs(active_sessions) do
162 active_sessions[session] = nil;
163 push_session_to_all(session, session_stats);
164 end
165 return 1;
166 end);
167
168 module:hook("stats-updated", function (event)
169 local stats = event.changed_stats;
170 push_stat_to_all("time", time_now());
171 for name, value in pairs(stats) do
172 push_stat_to_all(name, value);
173 end
174 end);
175
176 module:hook("server-stopping", function ()
177 push_stat_to_all("stop_time", time_now());
178 end);
179 end
180 function module.unload()
181 filters.remove_filter_hook(add_statistics_filter);
182 end
183
184 if prosody and prosody.arg then
185 module:provides("net", {
186 default_port = 5782;
187 listener = listener;
188 private = true;
189 });
190 end