Software / code / prosody
Comparison
plugins/mod_csi_simple.lua @ 10829:67a09706e56e
mod_csi_simple: Record stats of how long buffers are held
Telnet command `stats:show("buffer_hold"):histogram()` looks nice!
| author | Kim Alvefur <zash@zash.se> |
|---|---|
| date | Sat, 09 May 2020 17:45:45 +0200 |
| parent | 10828:c12ed21f877e |
| child | 10830:8889d5037aca |
comparison
equal
deleted
inserted
replaced
| 10828:c12ed21f877e | 10829:67a09706e56e |
|---|---|
| 99 stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = from, stamp = dt.datetime()})); | 99 stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = from, stamp = dt.datetime()})); |
| 100 end | 100 end |
| 101 return stanza; | 101 return stanza; |
| 102 end | 102 end |
| 103 | 103 |
| 104 local measure_buffer_hold = module:measure("buffer_hold", "times"); | |
| 105 | |
| 104 local function manage_buffer(stanza, session) | 106 local function manage_buffer(stanza, session) |
| 105 local ctr = session.csi_counter or 0; | 107 local ctr = session.csi_counter or 0; |
| 106 local flush, why = should_flush(stanza, session, ctr); | 108 local flush, why = should_flush(stanza, session, ctr); |
| 107 if flush then | 109 if flush then |
| 110 if session.csi_measure_buffer_hold then | |
| 111 session.csi_measure_buffer_hold(); | |
| 112 session.csi_measure_buffer_hold = nil; | |
| 113 end | |
| 108 session.log("debug", "Flushing buffer (%s; queue size is %d)", why or "important", session.csi_counter); | 114 session.log("debug", "Flushing buffer (%s; queue size is %d)", why or "important", session.csi_counter); |
| 109 session.conn:resume_writes(); | 115 session.conn:resume_writes(); |
| 110 else | 116 else |
| 111 session.log("debug", "Holding buffer (%s; queue size is %d)", why or "unimportant", session.csi_counter); | 117 session.log("debug", "Holding buffer (%s; queue size is %d)", why or "unimportant", session.csi_counter); |
| 112 stanza = with_timestamp(stanza, jid.join(session.username, session.host)) | 118 stanza = with_timestamp(stanza, jid.join(session.username, session.host)) |
| 115 return stanza; | 121 return stanza; |
| 116 end | 122 end |
| 117 | 123 |
| 118 local function flush_buffer(data, session) | 124 local function flush_buffer(data, session) |
| 119 session.log("debug", "Flushing buffer (%s; queue size is %d)", "client activity", session.csi_counter); | 125 session.log("debug", "Flushing buffer (%s; queue size is %d)", "client activity", session.csi_counter); |
| 126 if session.csi_measure_buffer_hold then | |
| 127 session.csi_measure_buffer_hold(); | |
| 128 session.csi_measure_buffer_hold = nil; | |
| 129 end | |
| 120 session.conn:resume_writes(); | 130 session.conn:resume_writes(); |
| 121 return data; | 131 return data; |
| 122 end | 132 end |
| 123 | 133 |
| 124 function enable_optimizations(session) | 134 function enable_optimizations(session) |
| 125 if session.conn and session.conn.pause_writes then | 135 if session.conn and session.conn.pause_writes then |
| 126 session.conn:pause_writes(); | 136 session.conn:pause_writes(); |
| 137 session.csi_measure_buffer_hold = measure_buffer_hold(); | |
| 127 session.csi_counter = 0; | 138 session.csi_counter = 0; |
| 128 filters.add_filter(session, "stanzas/out", manage_buffer); | 139 filters.add_filter(session, "stanzas/out", manage_buffer); |
| 129 filters.add_filter(session, "bytes/in", flush_buffer); | 140 filters.add_filter(session, "bytes/in", flush_buffer); |
| 130 else | 141 else |
| 131 session.log("warn", "Session connection does not support write pausing"); | 142 session.log("warn", "Session connection does not support write pausing"); |
| 134 | 145 |
| 135 function disable_optimizations(session) | 146 function disable_optimizations(session) |
| 136 filters.remove_filter(session, "stanzas/out", manage_buffer); | 147 filters.remove_filter(session, "stanzas/out", manage_buffer); |
| 137 filters.remove_filter(session, "bytes/in", flush_buffer); | 148 filters.remove_filter(session, "bytes/in", flush_buffer); |
| 138 session.csi_counter = nil; | 149 session.csi_counter = nil; |
| 150 if session.csi_measure_buffer_hold then | |
| 151 session.csi_measure_buffer_hold(); | |
| 152 session.csi_measure_buffer_hold = nil; | |
| 153 end | |
| 139 if session.conn and session.conn.resume_writes then | 154 if session.conn and session.conn.resume_writes then |
| 140 session.conn:resume_writes(); | 155 session.conn:resume_writes(); |
| 141 end | 156 end |
| 142 end | 157 end |
| 143 | 158 |
| 158 | 173 |
| 159 module:hook("c2s-ondrain", function (event) | 174 module:hook("c2s-ondrain", function (event) |
| 160 local session = event.session; | 175 local session = event.session; |
| 161 if session.state == "inactive" and session.conn and session.conn.pause_writes then | 176 if session.state == "inactive" and session.conn and session.conn.pause_writes then |
| 162 session.conn:pause_writes(); | 177 session.conn:pause_writes(); |
| 178 session.csi_measure_buffer_hold = measure_buffer_hold(); | |
| 163 session.log("debug", "Buffer flushed, resuming inactive mode (queue size was %d)", session.csi_counter); | 179 session.log("debug", "Buffer flushed, resuming inactive mode (queue size was %d)", session.csi_counter); |
| 164 session.csi_counter = 0; | 180 session.csi_counter = 0; |
| 165 end | 181 end |
| 166 end); | 182 end); |
| 167 | 183 |