Software /
code /
prosody
Comparison
plugins/mod_csi_simple.lua @ 10061:5c71693c8345
Merge 0.11->trunk
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Mon, 08 Jul 2019 02:44:32 +0200 |
parent | 10025:4498f601516d |
child | 10277:45a58127a3e5 |
comparison
equal
deleted
inserted
replaced
10060:7a36b7ac309b | 10061:5c71693c8345 |
---|---|
7 module:depends"csi" | 7 module:depends"csi" |
8 | 8 |
9 local jid = require "util.jid"; | 9 local jid = require "util.jid"; |
10 local st = require "util.stanza"; | 10 local st = require "util.stanza"; |
11 local dt = require "util.datetime"; | 11 local dt = require "util.datetime"; |
12 local new_queue = require "util.queue".new; | 12 local filters = require "util.filters"; |
13 | |
14 local function new_pump(output, ...) | |
15 -- luacheck: ignore 212/self | |
16 local q = new_queue(...); | |
17 local flush = true; | |
18 function q:pause() | |
19 flush = false; | |
20 end | |
21 function q:resume() | |
22 flush = true; | |
23 return q:flush(); | |
24 end | |
25 local push = q.push; | |
26 function q:push(item) | |
27 local ok = push(self, item); | |
28 if not ok then | |
29 q:flush(); | |
30 output(item, self); | |
31 elseif flush then | |
32 return q:flush(); | |
33 end | |
34 return true; | |
35 end | |
36 function q:flush() | |
37 local item = self:pop(); | |
38 while item do | |
39 output(item, self); | |
40 item = self:pop(); | |
41 end | |
42 return true; | |
43 end | |
44 return q; | |
45 end | |
46 | 13 |
47 local queue_size = module:get_option_number("csi_queue_size", 256); | 14 local queue_size = module:get_option_number("csi_queue_size", 256); |
48 | 15 |
49 module:hook("csi-is-stanza-important", function (event) | 16 module:hook("csi-is-stanza-important", function (event) |
50 local stanza = event.stanza; | 17 local stanza = event.stanza; |
82 return false; | 49 return false; |
83 end | 50 end |
84 return true; | 51 return true; |
85 end, -1); | 52 end, -1); |
86 | 53 |
54 local function with_timestamp(stanza, from) | |
55 if st.is_stanza(stanza) and stanza.attr.xmlns == nil and stanza.name ~= "iq" then | |
56 stanza = st.clone(stanza); | |
57 stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = from, stamp = dt.datetime()})); | |
58 end | |
59 return stanza; | |
60 end | |
61 | |
62 local function manage_buffer(stanza, session) | |
63 local ctr = session.csi_counter or 0; | |
64 if ctr >= queue_size then | |
65 session.log("debug", "Queue size limit hit, flushing buffer (queue size is %d)", session.csi_counter); | |
66 session.conn:resume_writes(); | |
67 elseif module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then | |
68 session.log("debug", "Important stanza, flushing buffer (queue size is %d)", session.csi_counter); | |
69 session.conn:resume_writes(); | |
70 else | |
71 stanza = with_timestamp(stanza, jid.join(session.username, session.host)) | |
72 end | |
73 session.csi_counter = ctr + 1; | |
74 return stanza; | |
75 end | |
76 | |
77 local function flush_buffer(data, session) | |
78 session.log("debug", "Client sent something, flushing buffer once (queue size is %d)", session.csi_counter); | |
79 session.conn:resume_writes(); | |
80 return data; | |
81 end | |
82 | |
83 function enable_optimizations(session) | |
84 if session.conn and session.conn and session.conn.pause_writes then | |
85 session.conn:pause_writes(); | |
86 filters.add_filter(session, "stanzas/out", manage_buffer); | |
87 filters.add_filter(session, "bytes/in", flush_buffer); | |
88 else | |
89 session.log("warn", "Session connection does not support write pausing"); | |
90 end | |
91 end | |
92 | |
93 function disable_optimizations(session) | |
94 if session.conn and session.conn and session.conn.resume_writes then | |
95 filters.remove_filter(session, "stanzas/out", manage_buffer); | |
96 filters.remove_filter(session, "bytes/in", flush_buffer); | |
97 session.conn:resume_writes(); | |
98 end | |
99 end | |
100 | |
87 module:hook("csi-client-inactive", function (event) | 101 module:hook("csi-client-inactive", function (event) |
88 local session = event.origin; | 102 local session = event.origin; |
89 if session.pump then | 103 enable_optimizations(session); |
90 session.pump:pause(); | |
91 else | |
92 local bare_jid = jid.join(session.username, session.host); | |
93 local send = session.send; | |
94 session._orig_send = send; | |
95 local pump = new_pump(session.send, queue_size); | |
96 pump:pause(); | |
97 session.pump = pump; | |
98 function session.send(stanza) | |
99 if session.state == "active" or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then | |
100 pump:flush(); | |
101 send(stanza); | |
102 else | |
103 if st.is_stanza(stanza) and stanza.attr.xmlns == nil and stanza.name ~= "iq" then | |
104 stanza = st.clone(stanza); | |
105 stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = bare_jid, stamp = dt.datetime()})); | |
106 end | |
107 pump:push(stanza); | |
108 end | |
109 return true; | |
110 end | |
111 end | |
112 end); | 104 end); |
113 | 105 |
114 module:hook("csi-client-active", function (event) | 106 module:hook("csi-client-active", function (event) |
115 local session = event.origin; | 107 local session = event.origin; |
116 if session.pump then | 108 disable_optimizations(session); |
117 session.pump:resume(); | 109 end); |
110 | |
111 module:hook("pre-resource-unbind", function (event) | |
112 local session = event.session; | |
113 disable_optimizations(session); | |
114 end); | |
115 | |
116 module:hook("c2s-ondrain", function (event) | |
117 local session = event.session; | |
118 if session.state == "inactive" and session.conn and session.conn and session.conn.pause_writes then | |
119 session.conn:pause_writes(); | |
120 session.log("debug", "Buffer flushed, resuming inactive mode (queue size was %d)", session.csi_counter); | |
121 session.csi_counter = 0; | |
118 end | 122 end |
119 end); | 123 end); |
120 | 124 |
125 function module.load() | |
126 for _, user_session in pairs(prosody.hosts[module.host].sessions) do | |
127 for _, session in pairs(user_session.sessions) do | |
128 if session.state == "inactive" then | |
129 enable_optimizations(session); | |
130 end | |
131 end | |
132 end | |
133 end | |
134 | |
135 function module.unload() | |
136 for _, user_session in pairs(prosody.hosts[module.host].sessions) do | |
137 for _, session in pairs(user_session.sessions) do | |
138 if session.state == "inactive" then | |
139 disable_optimizations(session); | |
140 end | |
141 end | |
142 end | |
143 end |