# HG changeset patch
# User Kim Alvefur <zash@zash.se>
# Date 1553448818 -3600
# Node ID 601f9781a605a51346aea5683ce11c82a359150b
# Parent  ed011935c22da8043abc481f2fbf7d6fcc347218
mod_csi_simple: Count buffered items and flush when it reaches configured limit

In this mode, stanzas have been serialized to strings in the internal
net.server buffer, so it is difficult to count them after the fact.

diff -r ed011935c22d -r 601f9781a605 plugins/mod_csi_simple.lua
--- a/plugins/mod_csi_simple.lua	Sun Mar 24 18:32:50 2019 +0100
+++ b/plugins/mod_csi_simple.lua	Sun Mar 24 18:33:38 2019 +0100
@@ -10,6 +10,7 @@
 local st = require "util.stanza";
 local dt = require "util.datetime";
 local new_queue = require "util.queue".new;
+local filters = require "util.filters";
 
 local function new_pump(output, ...)
 	-- luacheck: ignore 212/self
@@ -92,10 +93,22 @@
 	return stanza;
 end
 
+local function manage_buffer(stanza, session)
+	local ctr = session.csi_counter or 0;
+	if ctr >= queue_size or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then
+		session.conn:resume_writes();
+	else
+		stanza = with_timestamp(stanza, jid.join(session.username, session.host))
+	end
+	session.csi_counter = ctr + 1;
+	return stanza;
+end
+
 module:hook("csi-client-inactive", function (event)
 	local session = event.origin;
 	if session.conn and session.conn and session.conn.pause_writes then
 		session.conn:pause_writes();
+		filters.add_filter(session, "stanzas/out", manage_buffer);
 	elseif session.pump then
 		session.pump:pause();
 	else
@@ -122,7 +135,16 @@
 	if session.pump then
 		session.pump:resume();
 	elseif session.conn and session.conn and session.conn.resume_writes then
+		filters.remove_filter(session, "stanzas/out", manage_buffer);
 		session.conn:resume_writes();
 	end
 end);
 
+
+module:hook("c2s-ondrain", function (event)
+	local session = event.session;
+	if session.state == "inactive" and session.conn and session.conn and session.conn.pause_writes then
+		session.csi_counter = 0;
+		session.conn:pause_writes();
+	end
+end);