Comparison

plugins/mod_csi_simple.lua @ 9912:601f9781a605

mod_csi_simple: Count buffered items and flush when it reaches configured limit In this mode, stanzas have been serialized to strings in the internal net.server buffer, so it is difficult to count them after the fact.
author Kim Alvefur <zash@zash.se>
date Sun, 24 Mar 2019 18:33:38 +0100
parent 9911:ed011935c22d
child 9913:7d78b24d8449
comparison
equal deleted inserted replaced
9911:ed011935c22d 9912:601f9781a605
8 8
9 local jid = require "util.jid"; 9 local jid = require "util.jid";
10 local st = require "util.stanza"; 10 local st = require "util.stanza";
11 local dt = require "util.datetime"; 11 local dt = require "util.datetime";
12 local new_queue = require "util.queue".new; 12 local new_queue = require "util.queue".new;
13 local filters = require "util.filters";
13 14
14 local function new_pump(output, ...) 15 local function new_pump(output, ...)
15 -- luacheck: ignore 212/self 16 -- luacheck: ignore 212/self
16 local q = new_queue(...); 17 local q = new_queue(...);
17 local flush = true; 18 local flush = true;
90 stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = from, stamp = dt.datetime()})); 91 stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = from, stamp = dt.datetime()}));
91 end 92 end
92 return stanza; 93 return stanza;
93 end 94 end
94 95
96 local function manage_buffer(stanza, session)
97 local ctr = session.csi_counter or 0;
98 if ctr >= queue_size or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then
99 session.conn:resume_writes();
100 else
101 stanza = with_timestamp(stanza, jid.join(session.username, session.host))
102 end
103 session.csi_counter = ctr + 1;
104 return stanza;
105 end
106
95 module:hook("csi-client-inactive", function (event) 107 module:hook("csi-client-inactive", function (event)
96 local session = event.origin; 108 local session = event.origin;
97 if session.conn and session.conn and session.conn.pause_writes then 109 if session.conn and session.conn and session.conn.pause_writes then
98 session.conn:pause_writes(); 110 session.conn:pause_writes();
111 filters.add_filter(session, "stanzas/out", manage_buffer);
99 elseif session.pump then 112 elseif session.pump then
100 session.pump:pause(); 113 session.pump:pause();
101 else 114 else
102 local bare_jid = jid.join(session.username, session.host); 115 local bare_jid = jid.join(session.username, session.host);
103 local send = session.send; 116 local send = session.send;
120 module:hook("csi-client-active", function (event) 133 module:hook("csi-client-active", function (event)
121 local session = event.origin; 134 local session = event.origin;
122 if session.pump then 135 if session.pump then
123 session.pump:resume(); 136 session.pump:resume();
124 elseif session.conn and session.conn and session.conn.resume_writes then 137 elseif session.conn and session.conn and session.conn.resume_writes then
138 filters.remove_filter(session, "stanzas/out", manage_buffer);
125 session.conn:resume_writes(); 139 session.conn:resume_writes();
126 end 140 end
127 end); 141 end);
128 142
143
144 module:hook("c2s-ondrain", function (event)
145 local session = event.session;
146 if session.state == "inactive" and session.conn and session.conn and session.conn.pause_writes then
147 session.csi_counter = 0;
148 session.conn:pause_writes();
149 end
150 end);