Comparison

plugins/mod_csi_simple.lua @ 9917:45b5528b128a

mod_csi_simple: Remove old "pump" queue/buffer method, handled in net.server now
author Kim Alvefur <zash@zash.se>
date Sun, 24 Mar 2019 20:22:01 +0100
parent 9914:2c6b7247c3d9
child 9918:6e9dcec259d0
comparison
equal deleted inserted replaced
9916:72501e3c1427 9917:45b5528b128a
7 module:depends"csi" 7 module:depends"csi"
8 8
9 local jid = require "util.jid"; 9 local jid = require "util.jid";
10 local st = require "util.stanza"; 10 local st = require "util.stanza";
11 local dt = require "util.datetime"; 11 local dt = require "util.datetime";
12 local new_queue = require "util.queue".new;
13 local filters = require "util.filters"; 12 local filters = require "util.filters";
14
15 local function new_pump(output, ...)
16 -- luacheck: ignore 212/self
17 local q = new_queue(...);
18 local flush = true;
19 function q:pause()
20 flush = false;
21 end
22 function q:resume()
23 flush = true;
24 return q:flush();
25 end
26 local push = q.push;
27 function q:push(item)
28 local ok = push(self, item);
29 if not ok then
30 q:flush();
31 output(item, self);
32 elseif flush then
33 return q:flush();
34 end
35 return true;
36 end
37 function q:flush()
38 local item = self:pop();
39 while item do
40 output(item, self);
41 item = self:pop();
42 end
43 return true;
44 end
45 return q;
46 end
47 13
48 local queue_size = module:get_option_number("csi_queue_size", 256); 14 local queue_size = module:get_option_number("csi_queue_size", 256);
49 15
50 module:hook("csi-is-stanza-important", function (event) 16 module:hook("csi-is-stanza-important", function (event)
51 local stanza = event.stanza; 17 local stanza = event.stanza;
107 local function flush_buffer(data, session) 73 local function flush_buffer(data, session)
108 session.conn:resume_writes(); 74 session.conn:resume_writes();
109 return data; 75 return data;
110 end 76 end
111 77
112 local function flush_pump(data, session)
113 session.pump:flush();
114 return data;
115 end
116
117 module:hook("csi-client-inactive", function (event) 78 module:hook("csi-client-inactive", function (event)
118 local session = event.origin; 79 local session = event.origin;
119 if session.conn and session.conn and session.conn.pause_writes then 80 if session.conn and session.conn and session.conn.pause_writes then
120 session.log("info", "Native net.server buffer management mode");
121 session.conn:pause_writes(); 81 session.conn:pause_writes();
122 filters.add_filter(session, "stanzas/out", manage_buffer); 82 filters.add_filter(session, "stanzas/out", manage_buffer);
123 filters.add_filter(session, "bytes/in", flush_buffer); 83 filters.add_filter(session, "bytes/in", flush_buffer);
124 elseif session.pump then
125 session.pump:pause();
126 else 84 else
127 local bare_jid = jid.join(session.username, session.host); 85 session.log("warn", "Session connection does not support write pausing");
128 local send = session.send;
129 session._orig_send = send;
130 local pump = new_pump(session.send, queue_size);
131 pump:pause();
132 session.pump = pump;
133 filters.add_filter(session, "bytes/in", flush_pump);
134 function session.send(stanza)
135 if session.state == "active" or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then
136 pump:flush();
137 send(stanza);
138 else
139 pump:push(with_timestamp(stanza, bare_jid));
140 end
141 return true;
142 end
143 end 86 end
144 end); 87 end);
145 88
146 module:hook("csi-client-active", function (event) 89 module:hook("csi-client-active", function (event)
147 local session = event.origin; 90 local session = event.origin;
148 if session.pump then 91 if session.conn and session.conn and session.conn.resume_writes then
149 session.pump:resume();
150 elseif session.conn and session.conn and session.conn.resume_writes then
151 filters.remove_filter(session, "stanzas/out", manage_buffer); 92 filters.remove_filter(session, "stanzas/out", manage_buffer);
152 filters.remove_filter(session, "bytes/in", flush_buffer); 93 filters.remove_filter(session, "bytes/in", flush_buffer);
153 session.conn:resume_writes(); 94 session.conn:resume_writes();
154 end 95 end
155 end); 96 end);