Software /
code /
prosody
Comparison
plugins/mod_csi_simple.lua @ 9912:601f9781a605
mod_csi_simple: Count buffered items and flush when it reaches configured limit
In this mode, stanzas have been serialized to strings in the internal
net.server buffer, so it is difficult to count them after the fact.
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Sun, 24 Mar 2019 18:33:38 +0100 |
parent | 9911:ed011935c22d |
child | 9913:7d78b24d8449 |
comparison
equal
deleted
inserted
replaced
9911:ed011935c22d | 9912:601f9781a605 |
---|---|
8 | 8 |
9 local jid = require "util.jid"; | 9 local jid = require "util.jid"; |
10 local st = require "util.stanza"; | 10 local st = require "util.stanza"; |
11 local dt = require "util.datetime"; | 11 local dt = require "util.datetime"; |
12 local new_queue = require "util.queue".new; | 12 local new_queue = require "util.queue".new; |
13 local filters = require "util.filters"; | |
13 | 14 |
14 local function new_pump(output, ...) | 15 local function new_pump(output, ...) |
15 -- luacheck: ignore 212/self | 16 -- luacheck: ignore 212/self |
16 local q = new_queue(...); | 17 local q = new_queue(...); |
17 local flush = true; | 18 local flush = true; |
90 stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = from, stamp = dt.datetime()})); | 91 stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = from, stamp = dt.datetime()})); |
91 end | 92 end |
92 return stanza; | 93 return stanza; |
93 end | 94 end |
94 | 95 |
96 local function manage_buffer(stanza, session) | |
97 local ctr = session.csi_counter or 0; | |
98 if ctr >= queue_size or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then | |
99 session.conn:resume_writes(); | |
100 else | |
101 stanza = with_timestamp(stanza, jid.join(session.username, session.host)) | |
102 end | |
103 session.csi_counter = ctr + 1; | |
104 return stanza; | |
105 end | |
106 | |
95 module:hook("csi-client-inactive", function (event) | 107 module:hook("csi-client-inactive", function (event) |
96 local session = event.origin; | 108 local session = event.origin; |
97 if session.conn and session.conn and session.conn.pause_writes then | 109 if session.conn and session.conn and session.conn.pause_writes then |
98 session.conn:pause_writes(); | 110 session.conn:pause_writes(); |
111 filters.add_filter(session, "stanzas/out", manage_buffer); | |
99 elseif session.pump then | 112 elseif session.pump then |
100 session.pump:pause(); | 113 session.pump:pause(); |
101 else | 114 else |
102 local bare_jid = jid.join(session.username, session.host); | 115 local bare_jid = jid.join(session.username, session.host); |
103 local send = session.send; | 116 local send = session.send; |
120 module:hook("csi-client-active", function (event) | 133 module:hook("csi-client-active", function (event) |
121 local session = event.origin; | 134 local session = event.origin; |
122 if session.pump then | 135 if session.pump then |
123 session.pump:resume(); | 136 session.pump:resume(); |
124 elseif session.conn and session.conn and session.conn.resume_writes then | 137 elseif session.conn and session.conn and session.conn.resume_writes then |
138 filters.remove_filter(session, "stanzas/out", manage_buffer); | |
125 session.conn:resume_writes(); | 139 session.conn:resume_writes(); |
126 end | 140 end |
127 end); | 141 end); |
128 | 142 |
143 | |
144 module:hook("c2s-ondrain", function (event) | |
145 local session = event.session; | |
146 if session.state == "inactive" and session.conn and session.conn and session.conn.pause_writes then | |
147 session.csi_counter = 0; | |
148 session.conn:pause_writes(); | |
149 end | |
150 end); |