Comparison

plugins/mod_csi_simple.lua @ 11120:b2331f3dfeea

Merge 0.11->trunk
author Matthew Wild <mwild1@gmail.com>
date Wed, 30 Sep 2020 09:50:33 +0100
parent 10833:ac691f305ea7
child 11260:08b397c21805
comparison
equal deleted inserted replaced
11119:68df52bf08d5 11120:b2331f3dfeea
1 -- Copyright (C) 2016-2018 Kim Alvefur 1 -- Copyright (C) 2016-2020 Kim Alvefur
2 -- 2 --
3 -- This project is MIT/X11 licensed. Please see the 3 -- This project is MIT/X11 licensed. Please see the
4 -- COPYING file in the source package for more information. 4 -- COPYING file in the source package for more information.
5 -- 5 --
6 6
7 module:depends"csi" 7 module:depends"csi"
8 8
9 local jid = require "util.jid"; 9 local jid = require "util.jid";
10 local st = require "util.stanza"; 10 local st = require "util.stanza";
11 local dt = require "util.datetime"; 11 local dt = require "util.datetime";
12 local new_queue = require "util.queue".new; 12 local filters = require "util.filters";
13
14 local function new_pump(output, ...)
15 -- luacheck: ignore 212/self
16 local q = new_queue(...);
17 local flush = true;
18 function q:pause()
19 flush = false;
20 end
21 function q:resume()
22 flush = true;
23 return q:flush();
24 end
25 local push = q.push;
26 function q:push(item)
27 local ok = push(self, item);
28 if not ok then
29 q:flush();
30 output(item, self);
31 elseif flush then
32 return q:flush();
33 end
34 return true;
35 end
36 function q:flush()
37 local item = self:pop();
38 while item do
39 output(item, self);
40 item = self:pop();
41 end
42 return true;
43 end
44 return q;
45 end
46 13
47 local queue_size = module:get_option_number("csi_queue_size", 256); 14 local queue_size = module:get_option_number("csi_queue_size", 256);
48 15
49 module:hook("csi-is-stanza-important", function (event) 16 local important_payloads = module:get_option_set("csi_important_payloads", { });
50 local stanza = event.stanza; 17
51 if not st.is_stanza(stanza) then 18 function is_important(stanza) --> boolean, reason: string
52 return true; 19 if stanza == " " then
20 return true, "whitespace keepalive";
21 elseif type(stanza) == "string" then
22 return true, "raw data";
23 elseif not st.is_stanza(stanza) then
24 -- This should probably never happen
25 return true, type(stanza);
26 end
27 if stanza.attr.xmlns ~= nil then
28 -- stream errors, stream management etc
29 return true, "nonza";
53 end 30 end
54 local st_name = stanza.name; 31 local st_name = stanza.name;
55 if not st_name then return false; end 32 if not st_name then return false; end
56 local st_type = stanza.attr.type; 33 local st_type = stanza.attr.type;
57 if st_name == "presence" then 34 if st_name == "presence" then
58 if st_type == nil or st_type == "unavailable" then 35 if st_type == nil or st_type == "unavailable" or st_type == "error" then
59 return false; 36 return false, "presence update";
60 end 37 end
61 return true; 38 -- TODO Some MUC awareness, e.g. check for the 'this relates to you' status code
39 return true, "subscription request";
62 elseif st_name == "message" then 40 elseif st_name == "message" then
63 if st_type == "headline" then 41 if st_type == "headline" then
64 return false; 42 -- Headline messages are ephemeral by definition
43 return false, "headline";
44 end
45 if st_type == "error" then
46 return true, "delivery failure";
65 end 47 end
66 if stanza:get_child("sent", "urn:xmpp:carbons:2") then 48 if stanza:get_child("sent", "urn:xmpp:carbons:2") then
67 return true; 49 return true, "carbon";
68 end 50 end
69 local forwarded = stanza:find("{urn:xmpp:carbons:2}received/{urn:xmpp:forward:0}/{jabber:client}message"); 51 local forwarded = stanza:find("{urn:xmpp:carbons:2}received/{urn:xmpp:forward:0}/{jabber:client}message");
70 if forwarded then 52 if forwarded then
71 stanza = forwarded; 53 stanza = forwarded;
72 end 54 end
73 if stanza:get_child("body") then 55 if stanza:get_child("body") then
74 return true; 56 return true, "body";
75 end 57 end
76 if stanza:get_child("subject") then 58 if stanza:get_child("subject") then
77 return true; 59 -- Last step of a MUC join
60 return true, "subject";
78 end 61 end
79 if stanza:get_child("encryption", "urn:xmpp:eme:0") then 62 if stanza:get_child("encryption", "urn:xmpp:eme:0") then
80 return true; 63 -- Since we can't know what an encrypted message contains, we assume it's important
64 -- XXX Experimental XEP
65 return true, "encrypted";
66 end
67 if stanza:get_child("x", "jabber:x:conference") or stanza:find("{http://jabber.org/protocol/muc#user}x/invite") then
68 return true, "invite";
81 end 69 end
82 if stanza:get_child(nil, "urn:xmpp:jingle-message:0") then 70 if stanza:get_child(nil, "urn:xmpp:jingle-message:0") then
83 return true; 71 -- XXX Experimental XEP stuck in Proposed for almost a year at the time of this comment
72 return true, "jingle call";
73 end
74 for important in important_payloads do
75 if stanza:find(important) then
76 return true;
77 end
84 end 78 end
85 return false; 79 return false;
86 end 80 elseif st_name == "iq" then
87 return true; 81 return true;
82 end
83 end
84
85 module:hook("csi-is-stanza-important", function (event)
86 local important, why = is_important(event.stanza);
87 event.reason = why;
88 return important;
88 end, -1); 89 end, -1);
90
91 local function should_flush(stanza, session, ctr) --> boolean, reason: string
92 if ctr >= queue_size then
93 return true, "queue size limit reached";
94 end
95 local event = { stanza = stanza, session = session };
96 local ret = module:fire_event("csi-is-stanza-important", event)
97 return ret, event.reason;
98 end
99
100 local function with_timestamp(stanza, from)
101 if st.is_stanza(stanza) and stanza.attr.xmlns == nil and stanza.name ~= "iq" then
102 stanza = st.clone(stanza);
103 stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = from, stamp = dt.datetime()}));
104 end
105 return stanza;
106 end
107
108 local measure_buffer_hold = module:measure("buffer_hold", "times");
109
110 local flush_reasons = setmetatable({}, {
111 __index = function (t, reason)
112 local m = module:measure("flush_reason."..reason:gsub("%W", "_"), "rate");
113 t[reason] = m;
114 return m;
115 end;
116 });
117
118
119 local function manage_buffer(stanza, session)
120 local ctr = session.csi_counter or 0;
121 local flush, why = should_flush(stanza, session, ctr);
122 if flush then
123 if session.csi_measure_buffer_hold then
124 session.csi_measure_buffer_hold();
125 session.csi_measure_buffer_hold = nil;
126 end
127 flush_reasons[why or "important"]();
128 session.log("debug", "Flushing buffer (%s; queue size is %d)", why or "important", session.csi_counter);
129 session.conn:resume_writes();
130 else
131 session.log("debug", "Holding buffer (%s; queue size is %d)", why or "unimportant", session.csi_counter);
132 stanza = with_timestamp(stanza, jid.join(session.username, session.host))
133 end
134 session.csi_counter = ctr + 1;
135 return stanza;
136 end
137
138 local function flush_buffer(data, session)
139 session.log("debug", "Flushing buffer (%s; queue size is %d)", "client activity", session.csi_counter);
140 flush_reasons["client activity"]();
141 if session.csi_measure_buffer_hold then
142 session.csi_measure_buffer_hold();
143 session.csi_measure_buffer_hold = nil;
144 end
145 session.conn:resume_writes();
146 return data;
147 end
148
149 function enable_optimizations(session)
150 if session.conn and session.conn.pause_writes then
151 session.conn:pause_writes();
152 session.csi_measure_buffer_hold = measure_buffer_hold();
153 session.csi_counter = 0;
154 filters.add_filter(session, "stanzas/out", manage_buffer);
155 filters.add_filter(session, "bytes/in", flush_buffer);
156 else
157 session.log("warn", "Session connection does not support write pausing");
158 end
159 end
160
161 function disable_optimizations(session)
162 filters.remove_filter(session, "stanzas/out", manage_buffer);
163 filters.remove_filter(session, "bytes/in", flush_buffer);
164 session.csi_counter = nil;
165 if session.csi_measure_buffer_hold then
166 session.csi_measure_buffer_hold();
167 session.csi_measure_buffer_hold = nil;
168 end
169 if session.conn and session.conn.resume_writes then
170 session.conn:resume_writes();
171 end
172 end
89 173
90 module:hook("csi-client-inactive", function (event) 174 module:hook("csi-client-inactive", function (event)
91 local session = event.origin; 175 local session = event.origin;
92 if session.pump then 176 enable_optimizations(session);
93 session.pump:pause();
94 else
95 local bare_jid = jid.join(session.username, session.host);
96 local send = session.send;
97 session._orig_send = send;
98 local pump = new_pump(session.send, queue_size);
99 pump:pause();
100 session.pump = pump;
101 function session.send(stanza)
102 if session.state == "active" or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then
103 pump:flush();
104 send(stanza);
105 else
106 if st.is_stanza(stanza) and stanza.attr.xmlns == nil and stanza.name ~= "iq" then
107 stanza = st.clone(stanza);
108 stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = bare_jid, stamp = dt.datetime()}));
109 end
110 pump:push(stanza);
111 end
112 return true;
113 end
114 end
115 end); 177 end);
116 178
117 module:hook("csi-client-active", function (event) 179 module:hook("csi-client-active", function (event)
118 local session = event.origin; 180 local session = event.origin;
119 if session.pump then 181 disable_optimizations(session);
120 session.pump:resume();
121 end
122 end); 182 end);
123 183
184 module:hook("pre-resource-unbind", function (event)
185 local session = event.session;
186 disable_optimizations(session);
187 end, 1);
188
189 module:hook("c2s-ondrain", function (event)
190 local session = event.session;
191 if session.state == "inactive" and session.conn and session.conn.pause_writes then
192 session.conn:pause_writes();
193 session.csi_measure_buffer_hold = measure_buffer_hold();
194 session.log("debug", "Buffer flushed, resuming inactive mode (queue size was %d)", session.csi_counter);
195 session.csi_counter = 0;
196 end
197 end);
198
199 function module.load()
200 for _, user_session in pairs(prosody.hosts[module.host].sessions) do
201 for _, session in pairs(user_session.sessions) do
202 if session.state == "inactive" then
203 enable_optimizations(session);
204 end
205 end
206 end
207 end
208
209 function module.unload()
210 for _, user_session in pairs(prosody.hosts[module.host].sessions) do
211 for _, session in pairs(user_session.sessions) do
212 if session.state == "inactive" then
213 disable_optimizations(session);
214 end
215 end
216 end
217 end