Software /
code /
prosody
Comparison
plugins/mod_csi_simple.lua @ 9917:45b5528b128a
mod_csi_simple: Remove old "pump" queue/buffer method, handled in net.server now
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Sun, 24 Mar 2019 20:22:01 +0100 |
parent | 9914:2c6b7247c3d9 |
child | 9918:6e9dcec259d0 |
comparison
equal
deleted
inserted
replaced
9916:72501e3c1427 | 9917:45b5528b128a |
---|---|
7 module:depends"csi" | 7 module:depends"csi" |
8 | 8 |
9 local jid = require "util.jid"; | 9 local jid = require "util.jid"; |
10 local st = require "util.stanza"; | 10 local st = require "util.stanza"; |
11 local dt = require "util.datetime"; | 11 local dt = require "util.datetime"; |
12 local new_queue = require "util.queue".new; | |
13 local filters = require "util.filters"; | 12 local filters = require "util.filters"; |
14 | |
15 local function new_pump(output, ...) | |
16 -- luacheck: ignore 212/self | |
17 local q = new_queue(...); | |
18 local flush = true; | |
19 function q:pause() | |
20 flush = false; | |
21 end | |
22 function q:resume() | |
23 flush = true; | |
24 return q:flush(); | |
25 end | |
26 local push = q.push; | |
27 function q:push(item) | |
28 local ok = push(self, item); | |
29 if not ok then | |
30 q:flush(); | |
31 output(item, self); | |
32 elseif flush then | |
33 return q:flush(); | |
34 end | |
35 return true; | |
36 end | |
37 function q:flush() | |
38 local item = self:pop(); | |
39 while item do | |
40 output(item, self); | |
41 item = self:pop(); | |
42 end | |
43 return true; | |
44 end | |
45 return q; | |
46 end | |
47 | 13 |
48 local queue_size = module:get_option_number("csi_queue_size", 256); | 14 local queue_size = module:get_option_number("csi_queue_size", 256); |
49 | 15 |
50 module:hook("csi-is-stanza-important", function (event) | 16 module:hook("csi-is-stanza-important", function (event) |
51 local stanza = event.stanza; | 17 local stanza = event.stanza; |
107 local function flush_buffer(data, session) | 73 local function flush_buffer(data, session) |
108 session.conn:resume_writes(); | 74 session.conn:resume_writes(); |
109 return data; | 75 return data; |
110 end | 76 end |
111 | 77 |
112 local function flush_pump(data, session) | |
113 session.pump:flush(); | |
114 return data; | |
115 end | |
116 | |
117 module:hook("csi-client-inactive", function (event) | 78 module:hook("csi-client-inactive", function (event) |
118 local session = event.origin; | 79 local session = event.origin; |
119 if session.conn and session.conn and session.conn.pause_writes then | 80 if session.conn and session.conn and session.conn.pause_writes then |
120 session.log("info", "Native net.server buffer management mode"); | |
121 session.conn:pause_writes(); | 81 session.conn:pause_writes(); |
122 filters.add_filter(session, "stanzas/out", manage_buffer); | 82 filters.add_filter(session, "stanzas/out", manage_buffer); |
123 filters.add_filter(session, "bytes/in", flush_buffer); | 83 filters.add_filter(session, "bytes/in", flush_buffer); |
124 elseif session.pump then | |
125 session.pump:pause(); | |
126 else | 84 else |
127 local bare_jid = jid.join(session.username, session.host); | 85 session.log("warn", "Session connection does not support write pausing"); |
128 local send = session.send; | |
129 session._orig_send = send; | |
130 local pump = new_pump(session.send, queue_size); | |
131 pump:pause(); | |
132 session.pump = pump; | |
133 filters.add_filter(session, "bytes/in", flush_pump); | |
134 function session.send(stanza) | |
135 if session.state == "active" or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then | |
136 pump:flush(); | |
137 send(stanza); | |
138 else | |
139 pump:push(with_timestamp(stanza, bare_jid)); | |
140 end | |
141 return true; | |
142 end | |
143 end | 86 end |
144 end); | 87 end); |
145 | 88 |
146 module:hook("csi-client-active", function (event) | 89 module:hook("csi-client-active", function (event) |
147 local session = event.origin; | 90 local session = event.origin; |
148 if session.pump then | 91 if session.conn and session.conn and session.conn.resume_writes then |
149 session.pump:resume(); | |
150 elseif session.conn and session.conn and session.conn.resume_writes then | |
151 filters.remove_filter(session, "stanzas/out", manage_buffer); | 92 filters.remove_filter(session, "stanzas/out", manage_buffer); |
152 filters.remove_filter(session, "bytes/in", flush_buffer); | 93 filters.remove_filter(session, "bytes/in", flush_buffer); |
153 session.conn:resume_writes(); | 94 session.conn:resume_writes(); |
154 end | 95 end |
155 end); | 96 end); |