Software /
code /
prosody
Comparison
plugins/mod_csi_simple.lua @ 11120:b2331f3dfeea
Merge 0.11->trunk
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Wed, 30 Sep 2020 09:50:33 +0100 |
parent | 10833:ac691f305ea7 |
child | 11260:08b397c21805 |
comparison
equal
deleted
inserted
replaced
11119:68df52bf08d5 | 11120:b2331f3dfeea |
---|---|
1 -- Copyright (C) 2016-2018 Kim Alvefur | 1 -- Copyright (C) 2016-2020 Kim Alvefur |
2 -- | 2 -- |
3 -- This project is MIT/X11 licensed. Please see the | 3 -- This project is MIT/X11 licensed. Please see the |
4 -- COPYING file in the source package for more information. | 4 -- COPYING file in the source package for more information. |
5 -- | 5 -- |
6 | 6 |
7 module:depends"csi" | 7 module:depends"csi" |
8 | 8 |
9 local jid = require "util.jid"; | 9 local jid = require "util.jid"; |
10 local st = require "util.stanza"; | 10 local st = require "util.stanza"; |
11 local dt = require "util.datetime"; | 11 local dt = require "util.datetime"; |
12 local new_queue = require "util.queue".new; | 12 local filters = require "util.filters"; |
13 | |
14 local function new_pump(output, ...) | |
15 -- luacheck: ignore 212/self | |
16 local q = new_queue(...); | |
17 local flush = true; | |
18 function q:pause() | |
19 flush = false; | |
20 end | |
21 function q:resume() | |
22 flush = true; | |
23 return q:flush(); | |
24 end | |
25 local push = q.push; | |
26 function q:push(item) | |
27 local ok = push(self, item); | |
28 if not ok then | |
29 q:flush(); | |
30 output(item, self); | |
31 elseif flush then | |
32 return q:flush(); | |
33 end | |
34 return true; | |
35 end | |
36 function q:flush() | |
37 local item = self:pop(); | |
38 while item do | |
39 output(item, self); | |
40 item = self:pop(); | |
41 end | |
42 return true; | |
43 end | |
44 return q; | |
45 end | |
46 | 13 |
47 local queue_size = module:get_option_number("csi_queue_size", 256); | 14 local queue_size = module:get_option_number("csi_queue_size", 256); |
48 | 15 |
49 module:hook("csi-is-stanza-important", function (event) | 16 local important_payloads = module:get_option_set("csi_important_payloads", { }); |
50 local stanza = event.stanza; | 17 |
51 if not st.is_stanza(stanza) then | 18 function is_important(stanza) --> boolean, reason: string |
52 return true; | 19 if stanza == " " then |
20 return true, "whitespace keepalive"; | |
21 elseif type(stanza) == "string" then | |
22 return true, "raw data"; | |
23 elseif not st.is_stanza(stanza) then | |
24 -- This should probably never happen | |
25 return true, type(stanza); | |
26 end | |
27 if stanza.attr.xmlns ~= nil then | |
28 -- stream errors, stream management etc | |
29 return true, "nonza"; | |
53 end | 30 end |
54 local st_name = stanza.name; | 31 local st_name = stanza.name; |
55 if not st_name then return false; end | 32 if not st_name then return false; end |
56 local st_type = stanza.attr.type; | 33 local st_type = stanza.attr.type; |
57 if st_name == "presence" then | 34 if st_name == "presence" then |
58 if st_type == nil or st_type == "unavailable" then | 35 if st_type == nil or st_type == "unavailable" or st_type == "error" then |
59 return false; | 36 return false, "presence update"; |
60 end | 37 end |
61 return true; | 38 -- TODO Some MUC awareness, e.g. check for the 'this relates to you' status code |
39 return true, "subscription request"; | |
62 elseif st_name == "message" then | 40 elseif st_name == "message" then |
63 if st_type == "headline" then | 41 if st_type == "headline" then |
64 return false; | 42 -- Headline messages are ephemeral by definition |
43 return false, "headline"; | |
44 end | |
45 if st_type == "error" then | |
46 return true, "delivery failure"; | |
65 end | 47 end |
66 if stanza:get_child("sent", "urn:xmpp:carbons:2") then | 48 if stanza:get_child("sent", "urn:xmpp:carbons:2") then |
67 return true; | 49 return true, "carbon"; |
68 end | 50 end |
69 local forwarded = stanza:find("{urn:xmpp:carbons:2}received/{urn:xmpp:forward:0}/{jabber:client}message"); | 51 local forwarded = stanza:find("{urn:xmpp:carbons:2}received/{urn:xmpp:forward:0}/{jabber:client}message"); |
70 if forwarded then | 52 if forwarded then |
71 stanza = forwarded; | 53 stanza = forwarded; |
72 end | 54 end |
73 if stanza:get_child("body") then | 55 if stanza:get_child("body") then |
74 return true; | 56 return true, "body"; |
75 end | 57 end |
76 if stanza:get_child("subject") then | 58 if stanza:get_child("subject") then |
77 return true; | 59 -- Last step of a MUC join |
60 return true, "subject"; | |
78 end | 61 end |
79 if stanza:get_child("encryption", "urn:xmpp:eme:0") then | 62 if stanza:get_child("encryption", "urn:xmpp:eme:0") then |
80 return true; | 63 -- Since we can't know what an encrypted message contains, we assume it's important |
64 -- XXX Experimental XEP | |
65 return true, "encrypted"; | |
66 end | |
67 if stanza:get_child("x", "jabber:x:conference") or stanza:find("{http://jabber.org/protocol/muc#user}x/invite") then | |
68 return true, "invite"; | |
81 end | 69 end |
82 if stanza:get_child(nil, "urn:xmpp:jingle-message:0") then | 70 if stanza:get_child(nil, "urn:xmpp:jingle-message:0") then |
83 return true; | 71 -- XXX Experimental XEP stuck in Proposed for almost a year at the time of this comment |
72 return true, "jingle call"; | |
73 end | |
74 for important in important_payloads do | |
75 if stanza:find(important) then | |
76 return true; | |
77 end | |
84 end | 78 end |
85 return false; | 79 return false; |
86 end | 80 elseif st_name == "iq" then |
87 return true; | 81 return true; |
82 end | |
83 end | |
84 | |
85 module:hook("csi-is-stanza-important", function (event) | |
86 local important, why = is_important(event.stanza); | |
87 event.reason = why; | |
88 return important; | |
88 end, -1); | 89 end, -1); |
90 | |
91 local function should_flush(stanza, session, ctr) --> boolean, reason: string | |
92 if ctr >= queue_size then | |
93 return true, "queue size limit reached"; | |
94 end | |
95 local event = { stanza = stanza, session = session }; | |
96 local ret = module:fire_event("csi-is-stanza-important", event) | |
97 return ret, event.reason; | |
98 end | |
99 | |
100 local function with_timestamp(stanza, from) | |
101 if st.is_stanza(stanza) and stanza.attr.xmlns == nil and stanza.name ~= "iq" then | |
102 stanza = st.clone(stanza); | |
103 stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = from, stamp = dt.datetime()})); | |
104 end | |
105 return stanza; | |
106 end | |
107 | |
108 local measure_buffer_hold = module:measure("buffer_hold", "times"); | |
109 | |
110 local flush_reasons = setmetatable({}, { | |
111 __index = function (t, reason) | |
112 local m = module:measure("flush_reason."..reason:gsub("%W", "_"), "rate"); | |
113 t[reason] = m; | |
114 return m; | |
115 end; | |
116 }); | |
117 | |
118 | |
119 local function manage_buffer(stanza, session) | |
120 local ctr = session.csi_counter or 0; | |
121 local flush, why = should_flush(stanza, session, ctr); | |
122 if flush then | |
123 if session.csi_measure_buffer_hold then | |
124 session.csi_measure_buffer_hold(); | |
125 session.csi_measure_buffer_hold = nil; | |
126 end | |
127 flush_reasons[why or "important"](); | |
128 session.log("debug", "Flushing buffer (%s; queue size is %d)", why or "important", session.csi_counter); | |
129 session.conn:resume_writes(); | |
130 else | |
131 session.log("debug", "Holding buffer (%s; queue size is %d)", why or "unimportant", session.csi_counter); | |
132 stanza = with_timestamp(stanza, jid.join(session.username, session.host)) | |
133 end | |
134 session.csi_counter = ctr + 1; | |
135 return stanza; | |
136 end | |
137 | |
138 local function flush_buffer(data, session) | |
139 session.log("debug", "Flushing buffer (%s; queue size is %d)", "client activity", session.csi_counter); | |
140 flush_reasons["client activity"](); | |
141 if session.csi_measure_buffer_hold then | |
142 session.csi_measure_buffer_hold(); | |
143 session.csi_measure_buffer_hold = nil; | |
144 end | |
145 session.conn:resume_writes(); | |
146 return data; | |
147 end | |
148 | |
149 function enable_optimizations(session) | |
150 if session.conn and session.conn.pause_writes then | |
151 session.conn:pause_writes(); | |
152 session.csi_measure_buffer_hold = measure_buffer_hold(); | |
153 session.csi_counter = 0; | |
154 filters.add_filter(session, "stanzas/out", manage_buffer); | |
155 filters.add_filter(session, "bytes/in", flush_buffer); | |
156 else | |
157 session.log("warn", "Session connection does not support write pausing"); | |
158 end | |
159 end | |
160 | |
161 function disable_optimizations(session) | |
162 filters.remove_filter(session, "stanzas/out", manage_buffer); | |
163 filters.remove_filter(session, "bytes/in", flush_buffer); | |
164 session.csi_counter = nil; | |
165 if session.csi_measure_buffer_hold then | |
166 session.csi_measure_buffer_hold(); | |
167 session.csi_measure_buffer_hold = nil; | |
168 end | |
169 if session.conn and session.conn.resume_writes then | |
170 session.conn:resume_writes(); | |
171 end | |
172 end | |
89 | 173 |
90 module:hook("csi-client-inactive", function (event) | 174 module:hook("csi-client-inactive", function (event) |
91 local session = event.origin; | 175 local session = event.origin; |
92 if session.pump then | 176 enable_optimizations(session); |
93 session.pump:pause(); | |
94 else | |
95 local bare_jid = jid.join(session.username, session.host); | |
96 local send = session.send; | |
97 session._orig_send = send; | |
98 local pump = new_pump(session.send, queue_size); | |
99 pump:pause(); | |
100 session.pump = pump; | |
101 function session.send(stanza) | |
102 if session.state == "active" or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then | |
103 pump:flush(); | |
104 send(stanza); | |
105 else | |
106 if st.is_stanza(stanza) and stanza.attr.xmlns == nil and stanza.name ~= "iq" then | |
107 stanza = st.clone(stanza); | |
108 stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = bare_jid, stamp = dt.datetime()})); | |
109 end | |
110 pump:push(stanza); | |
111 end | |
112 return true; | |
113 end | |
114 end | |
115 end); | 177 end); |
116 | 178 |
117 module:hook("csi-client-active", function (event) | 179 module:hook("csi-client-active", function (event) |
118 local session = event.origin; | 180 local session = event.origin; |
119 if session.pump then | 181 disable_optimizations(session); |
120 session.pump:resume(); | |
121 end | |
122 end); | 182 end); |
123 | 183 |
184 module:hook("pre-resource-unbind", function (event) | |
185 local session = event.session; | |
186 disable_optimizations(session); | |
187 end, 1); | |
188 | |
189 module:hook("c2s-ondrain", function (event) | |
190 local session = event.session; | |
191 if session.state == "inactive" and session.conn and session.conn.pause_writes then | |
192 session.conn:pause_writes(); | |
193 session.csi_measure_buffer_hold = measure_buffer_hold(); | |
194 session.log("debug", "Buffer flushed, resuming inactive mode (queue size was %d)", session.csi_counter); | |
195 session.csi_counter = 0; | |
196 end | |
197 end); | |
198 | |
199 function module.load() | |
200 for _, user_session in pairs(prosody.hosts[module.host].sessions) do | |
201 for _, session in pairs(user_session.sessions) do | |
202 if session.state == "inactive" then | |
203 enable_optimizations(session); | |
204 end | |
205 end | |
206 end | |
207 end | |
208 | |
209 function module.unload() | |
210 for _, user_session in pairs(prosody.hosts[module.host].sessions) do | |
211 for _, session in pairs(user_session.sessions) do | |
212 if session.state == "inactive" then | |
213 disable_optimizations(session); | |
214 end | |
215 end | |
216 end | |
217 end |