Comparison

plugins/mod_csi_simple.lua @ 10061:5c71693c8345

Merge 0.11->trunk
author Kim Alvefur <zash@zash.se>
date Mon, 08 Jul 2019 02:44:32 +0200
parent 10025:4498f601516d
child 10277:45a58127a3e5
comparison
equal deleted inserted replaced
10060:7a36b7ac309b 10061:5c71693c8345
7 module:depends"csi" 7 module:depends"csi"
8 8
9 local jid = require "util.jid"; 9 local jid = require "util.jid";
10 local st = require "util.stanza"; 10 local st = require "util.stanza";
11 local dt = require "util.datetime"; 11 local dt = require "util.datetime";
12 local new_queue = require "util.queue".new; 12 local filters = require "util.filters";
13
14 local function new_pump(output, ...)
15 -- luacheck: ignore 212/self
16 local q = new_queue(...);
17 local flush = true;
18 function q:pause()
19 flush = false;
20 end
21 function q:resume()
22 flush = true;
23 return q:flush();
24 end
25 local push = q.push;
26 function q:push(item)
27 local ok = push(self, item);
28 if not ok then
29 q:flush();
30 output(item, self);
31 elseif flush then
32 return q:flush();
33 end
34 return true;
35 end
36 function q:flush()
37 local item = self:pop();
38 while item do
39 output(item, self);
40 item = self:pop();
41 end
42 return true;
43 end
44 return q;
45 end
46 13
47 local queue_size = module:get_option_number("csi_queue_size", 256); 14 local queue_size = module:get_option_number("csi_queue_size", 256);
48 15
49 module:hook("csi-is-stanza-important", function (event) 16 module:hook("csi-is-stanza-important", function (event)
50 local stanza = event.stanza; 17 local stanza = event.stanza;
82 return false; 49 return false;
83 end 50 end
84 return true; 51 return true;
85 end, -1); 52 end, -1);
86 53
54 local function with_timestamp(stanza, from)
55 if st.is_stanza(stanza) and stanza.attr.xmlns == nil and stanza.name ~= "iq" then
56 stanza = st.clone(stanza);
57 stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = from, stamp = dt.datetime()}));
58 end
59 return stanza;
60 end
61
62 local function manage_buffer(stanza, session)
63 local ctr = session.csi_counter or 0;
64 if ctr >= queue_size then
65 session.log("debug", "Queue size limit hit, flushing buffer (queue size is %d)", session.csi_counter);
66 session.conn:resume_writes();
67 elseif module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then
68 session.log("debug", "Important stanza, flushing buffer (queue size is %d)", session.csi_counter);
69 session.conn:resume_writes();
70 else
71 stanza = with_timestamp(stanza, jid.join(session.username, session.host))
72 end
73 session.csi_counter = ctr + 1;
74 return stanza;
75 end
76
77 local function flush_buffer(data, session)
78 session.log("debug", "Client sent something, flushing buffer once (queue size is %d)", session.csi_counter);
79 session.conn:resume_writes();
80 return data;
81 end
82
83 function enable_optimizations(session)
84 if session.conn and session.conn and session.conn.pause_writes then
85 session.conn:pause_writes();
86 filters.add_filter(session, "stanzas/out", manage_buffer);
87 filters.add_filter(session, "bytes/in", flush_buffer);
88 else
89 session.log("warn", "Session connection does not support write pausing");
90 end
91 end
92
93 function disable_optimizations(session)
94 if session.conn and session.conn and session.conn.resume_writes then
95 filters.remove_filter(session, "stanzas/out", manage_buffer);
96 filters.remove_filter(session, "bytes/in", flush_buffer);
97 session.conn:resume_writes();
98 end
99 end
100
87 module:hook("csi-client-inactive", function (event) 101 module:hook("csi-client-inactive", function (event)
88 local session = event.origin; 102 local session = event.origin;
89 if session.pump then 103 enable_optimizations(session);
90 session.pump:pause();
91 else
92 local bare_jid = jid.join(session.username, session.host);
93 local send = session.send;
94 session._orig_send = send;
95 local pump = new_pump(session.send, queue_size);
96 pump:pause();
97 session.pump = pump;
98 function session.send(stanza)
99 if session.state == "active" or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then
100 pump:flush();
101 send(stanza);
102 else
103 if st.is_stanza(stanza) and stanza.attr.xmlns == nil and stanza.name ~= "iq" then
104 stanza = st.clone(stanza);
105 stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = bare_jid, stamp = dt.datetime()}));
106 end
107 pump:push(stanza);
108 end
109 return true;
110 end
111 end
112 end); 104 end);
113 105
114 module:hook("csi-client-active", function (event) 106 module:hook("csi-client-active", function (event)
115 local session = event.origin; 107 local session = event.origin;
116 if session.pump then 108 disable_optimizations(session);
117 session.pump:resume(); 109 end);
110
111 module:hook("pre-resource-unbind", function (event)
112 local session = event.session;
113 disable_optimizations(session);
114 end);
115
116 module:hook("c2s-ondrain", function (event)
117 local session = event.session;
118 if session.state == "inactive" and session.conn and session.conn and session.conn.pause_writes then
119 session.conn:pause_writes();
120 session.log("debug", "Buffer flushed, resuming inactive mode (queue size was %d)", session.csi_counter);
121 session.csi_counter = 0;
118 end 122 end
119 end); 123 end);
120 124
125 function module.load()
126 for _, user_session in pairs(prosody.hosts[module.host].sessions) do
127 for _, session in pairs(user_session.sessions) do
128 if session.state == "inactive" then
129 enable_optimizations(session);
130 end
131 end
132 end
133 end
134
135 function module.unload()
136 for _, user_session in pairs(prosody.hosts[module.host].sessions) do
137 for _, session in pairs(user_session.sessions) do
138 if session.state == "inactive" then
139 disable_optimizations(session);
140 end
141 end
142 end
143 end