Changeset

9912:601f9781a605

mod_csi_simple: Count buffered items and flush when it reaches configured limit In this mode, stanzas have been serialized to strings in the internal net.server buffer, so it is difficult to count them after the fact.
author Kim Alvefur <zash@zash.se>
date Sun, 24 Mar 2019 18:33:38 +0100 (2019-03-24)
parents 9911:ed011935c22d
children 9913:7d78b24d8449
files plugins/mod_csi_simple.lua
diffstat 1 files changed, 22 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/plugins/mod_csi_simple.lua	Sun Mar 24 18:32:50 2019 +0100
+++ b/plugins/mod_csi_simple.lua	Sun Mar 24 18:33:38 2019 +0100
@@ -10,6 +10,7 @@
 local st = require "util.stanza";
 local dt = require "util.datetime";
 local new_queue = require "util.queue".new;
+local filters = require "util.filters";
 
 local function new_pump(output, ...)
 	-- luacheck: ignore 212/self
@@ -92,10 +93,22 @@
 	return stanza;
 end
 
+local function manage_buffer(stanza, session)
+	local ctr = session.csi_counter or 0;
+	if ctr >= queue_size or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then
+		session.conn:resume_writes();
+	else
+		stanza = with_timestamp(stanza, jid.join(session.username, session.host))
+	end
+	session.csi_counter = ctr + 1;
+	return stanza;
+end
+
 module:hook("csi-client-inactive", function (event)
 	local session = event.origin;
 	if session.conn and session.conn and session.conn.pause_writes then
 		session.conn:pause_writes();
+		filters.add_filter(session, "stanzas/out", manage_buffer);
 	elseif session.pump then
 		session.pump:pause();
 	else
@@ -122,7 +135,16 @@
 	if session.pump then
 		session.pump:resume();
 	elseif session.conn and session.conn and session.conn.resume_writes then
+		filters.remove_filter(session, "stanzas/out", manage_buffer);
 		session.conn:resume_writes();
 	end
 end);
 
+
+module:hook("c2s-ondrain", function (event)
+	local session = event.session;
+	if session.state == "inactive" and session.conn and session.conn and session.conn.pause_writes then
+		session.csi_counter = 0;
+		session.conn:pause_writes();
+	end
+end);