Comparison

plugins/mod_csi_simple.lua @ 10829:67a09706e56e

mod_csi_simple: Record stats of how long buffers are held Telnet command `stats:show("buffer_hold"):histogram()` looks nice!
author Kim Alvefur <zash@zash.se>
date Sat, 09 May 2020 17:45:45 +0200
parent 10828:c12ed21f877e
child 10830:8889d5037aca
comparison
equal deleted inserted replaced
10828:c12ed21f877e 10829:67a09706e56e
99 stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = from, stamp = dt.datetime()})); 99 stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = from, stamp = dt.datetime()}));
100 end 100 end
101 return stanza; 101 return stanza;
102 end 102 end
103 103
104 local measure_buffer_hold = module:measure("buffer_hold", "times");
105
104 local function manage_buffer(stanza, session) 106 local function manage_buffer(stanza, session)
105 local ctr = session.csi_counter or 0; 107 local ctr = session.csi_counter or 0;
106 local flush, why = should_flush(stanza, session, ctr); 108 local flush, why = should_flush(stanza, session, ctr);
107 if flush then 109 if flush then
110 if session.csi_measure_buffer_hold then
111 session.csi_measure_buffer_hold();
112 session.csi_measure_buffer_hold = nil;
113 end
108 session.log("debug", "Flushing buffer (%s; queue size is %d)", why or "important", session.csi_counter); 114 session.log("debug", "Flushing buffer (%s; queue size is %d)", why or "important", session.csi_counter);
109 session.conn:resume_writes(); 115 session.conn:resume_writes();
110 else 116 else
111 session.log("debug", "Holding buffer (%s; queue size is %d)", why or "unimportant", session.csi_counter); 117 session.log("debug", "Holding buffer (%s; queue size is %d)", why or "unimportant", session.csi_counter);
112 stanza = with_timestamp(stanza, jid.join(session.username, session.host)) 118 stanza = with_timestamp(stanza, jid.join(session.username, session.host))
115 return stanza; 121 return stanza;
116 end 122 end
117 123
118 local function flush_buffer(data, session) 124 local function flush_buffer(data, session)
119 session.log("debug", "Flushing buffer (%s; queue size is %d)", "client activity", session.csi_counter); 125 session.log("debug", "Flushing buffer (%s; queue size is %d)", "client activity", session.csi_counter);
126 if session.csi_measure_buffer_hold then
127 session.csi_measure_buffer_hold();
128 session.csi_measure_buffer_hold = nil;
129 end
120 session.conn:resume_writes(); 130 session.conn:resume_writes();
121 return data; 131 return data;
122 end 132 end
123 133
124 function enable_optimizations(session) 134 function enable_optimizations(session)
125 if session.conn and session.conn.pause_writes then 135 if session.conn and session.conn.pause_writes then
126 session.conn:pause_writes(); 136 session.conn:pause_writes();
137 session.csi_measure_buffer_hold = measure_buffer_hold();
127 session.csi_counter = 0; 138 session.csi_counter = 0;
128 filters.add_filter(session, "stanzas/out", manage_buffer); 139 filters.add_filter(session, "stanzas/out", manage_buffer);
129 filters.add_filter(session, "bytes/in", flush_buffer); 140 filters.add_filter(session, "bytes/in", flush_buffer);
130 else 141 else
131 session.log("warn", "Session connection does not support write pausing"); 142 session.log("warn", "Session connection does not support write pausing");
134 145
135 function disable_optimizations(session) 146 function disable_optimizations(session)
136 filters.remove_filter(session, "stanzas/out", manage_buffer); 147 filters.remove_filter(session, "stanzas/out", manage_buffer);
137 filters.remove_filter(session, "bytes/in", flush_buffer); 148 filters.remove_filter(session, "bytes/in", flush_buffer);
138 session.csi_counter = nil; 149 session.csi_counter = nil;
150 if session.csi_measure_buffer_hold then
151 session.csi_measure_buffer_hold();
152 session.csi_measure_buffer_hold = nil;
153 end
139 if session.conn and session.conn.resume_writes then 154 if session.conn and session.conn.resume_writes then
140 session.conn:resume_writes(); 155 session.conn:resume_writes();
141 end 156 end
142 end 157 end
143 158
158 173
159 module:hook("c2s-ondrain", function (event) 174 module:hook("c2s-ondrain", function (event)
160 local session = event.session; 175 local session = event.session;
161 if session.state == "inactive" and session.conn and session.conn.pause_writes then 176 if session.state == "inactive" and session.conn and session.conn.pause_writes then
162 session.conn:pause_writes(); 177 session.conn:pause_writes();
178 session.csi_measure_buffer_hold = measure_buffer_hold();
163 session.log("debug", "Buffer flushed, resuming inactive mode (queue size was %d)", session.csi_counter); 179 session.log("debug", "Buffer flushed, resuming inactive mode (queue size was %d)", session.csi_counter);
164 session.csi_counter = 0; 180 session.csi_counter = 0;
165 end 181 end
166 end); 182 end);
167 183