Changeset

13554:902d25cd0557

mod_s2s: Limit size of outgoing stanza queue This queue is used to buffer stanzas while waiting for an outgoing s2s connection to be established. Limit it to prevent excessive memory usage. Default chosen to approximate how many average stanzas fits in the server_epoll default max_send_buffer_size of 32 MiB Returns a custom error instead of the default core.stanza_router "Communication with remote domains is not enabled" from is sent back, which does not describe what is happening here. Closes #1106
author Kim Alvefur <zash@zash.se>
date Sat, 09 Nov 2024 16:47:14 +0100
parents 13553:850e4ade7a01
children 13555:42b98ee73ca8
files plugins/mod_s2s.lua
diffstat 1 files changed, 14 insertions(+), 10 deletions(-) [+]
line wrap: on
line diff
--- a/plugins/mod_s2s.lua	Sat Nov 09 15:42:31 2024 +0100
+++ b/plugins/mod_s2s.lua	Sat Nov 09 16:47:14 2024 +0100
@@ -13,7 +13,6 @@
 local core_process_stanza = prosody.core_process_stanza;
 
 local tostring, type = tostring, type;
-local t_insert = table.insert;
 local traceback = debug.traceback;
 
 local add_task = require "prosody.util.timer".add_task;
@@ -33,6 +32,7 @@
 local resolver_chain = require "prosody.net.resolvers.chain";
 local errors = require "prosody.util.error";
 local set = require "prosody.util.set";
+local queue = require "prosody.util.queue";
 
 local connect_timeout = module:get_option_period("s2s_timeout", 90);
 local stream_close_timeout = module:get_option_period("s2s_close_timeout", 5);
@@ -42,6 +42,7 @@
 	module:get_option_set("s2s_secure_domains", {})._items, module:get_option_set("s2s_insecure_domains", {})._items;
 local require_encryption = module:get_option_boolean("s2s_require_encryption", true);
 local stanza_size_limit = module:get_option_integer("s2s_stanza_size_limit", 1024*512, 10000);
+local sendq_size = module:get_option_integer("s2s_send_queue_size", 1024*32, 1);
 
 local advertised_idle_timeout = 14*60; -- default in all net.server implementations
 local network_settings = module:get_option("network_settings");
@@ -158,7 +159,7 @@
 	elseif type(reason) == "string" then
 		reason_text = reason;
 	end
-	for i, stanza in ipairs(sendq) do
+	for stanza in sendq:consume() do
 		if not stanza.attr.xmlns and bouncy_stanzas[stanza.name] and stanza.attr.type ~= "error" and stanza.attr.type ~= "result" then
 			local reply = st.error_reply(
 				stanza,
@@ -170,7 +171,6 @@
 		else
 			(session.log or log)("debug", "Not eligible for bouncing, discarding %s", stanza:top_tag());
 		end
-		sendq[i] = nil;
 	end
 	session.sendq = nil;
 end
@@ -194,11 +194,14 @@
 		(host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host);
 
 		-- Queue stanza until we are able to send it
-		if host.sendq then
-			t_insert(host.sendq, st.clone(stanza));
-		else
+		if not host.sendq then
 			-- luacheck: ignore 122
-			host.sendq = { st.clone(stanza) };
+			host.sendq = queue.new(sendq_size);
+		end
+		if not host.sendq:push(st.clone(stanza)) then
+			host.log("warn", "stanza [%s] not queued ", stanza.name);
+			event.origin.send(st.error_reply(stanza, "wait", "resource-constraint", "Outgoing stanza queue full"));
+			return true;
 		end
 		host.log("debug", "stanza [%s] queued ", stanza.name);
 		return true;
@@ -223,7 +226,8 @@
 
 	-- Store in buffer
 	host_session.bounce_sendq = bounce_sendq;
-	host_session.sendq = { st.clone(stanza) };
+	host_session.sendq = queue.new(sendq_size);
+	host_session.sendq:push(stanza);
 	log("debug", "stanza [%s] queued until connection complete", stanza.name);
 	-- FIXME Cleaner solution to passing extra data from resolvers to net.server
 	-- This mt-clone allows resolvers to add extra data, currently used for DANE TLSA records
@@ -364,9 +368,9 @@
 		if sendq then
 			session.log("debug", "sending %d queued stanzas across new outgoing connection to %s", #sendq, session.to_host);
 			local send = session.sends2s;
-			for i, stanza in ipairs(sendq) do
+			for stanza in sendq:consume() do
+				-- TODO check send success
 				send(stanza);
-				sendq[i] = nil;
 			end
 			session.sendq = nil;
 		end