Comparison

plugins/mod_s2s.lua @ 13554:902d25cd0557

mod_s2s: Limit size of outgoing stanza queue This queue is used to buffer stanzas while waiting for an outgoing s2s connection to be established. Limit it to prevent excessive memory usage. Default chosen to approximate how many average stanzas fits in the server_epoll default max_send_buffer_size of 32 MiB Returns a custom error instead of the default core.stanza_router "Communication with remote domains is not enabled" from is sent back, which does not describe what is happening here. Closes #1106
author Kim Alvefur <zash@zash.se>
date Sat, 09 Nov 2024 16:47:14 +0100
parent 13534:d532176d4334
child 13555:42b98ee73ca8
comparison
equal deleted inserted replaced
13553:850e4ade7a01 13554:902d25cd0557
11 local prosody = prosody; 11 local prosody = prosody;
12 local hosts = prosody.hosts; 12 local hosts = prosody.hosts;
13 local core_process_stanza = prosody.core_process_stanza; 13 local core_process_stanza = prosody.core_process_stanza;
14 14
15 local tostring, type = tostring, type; 15 local tostring, type = tostring, type;
16 local t_insert = table.insert;
17 local traceback = debug.traceback; 16 local traceback = debug.traceback;
18 17
19 local add_task = require "prosody.util.timer".add_task; 18 local add_task = require "prosody.util.timer".add_task;
20 local stop_timer = require "prosody.util.timer".stop; 19 local stop_timer = require "prosody.util.timer".stop;
21 local st = require "prosody.util.stanza"; 20 local st = require "prosody.util.stanza";
31 local connect = require "prosody.net.connect".connect; 30 local connect = require "prosody.net.connect".connect;
32 local service = require "prosody.net.resolvers.service"; 31 local service = require "prosody.net.resolvers.service";
33 local resolver_chain = require "prosody.net.resolvers.chain"; 32 local resolver_chain = require "prosody.net.resolvers.chain";
34 local errors = require "prosody.util.error"; 33 local errors = require "prosody.util.error";
35 local set = require "prosody.util.set"; 34 local set = require "prosody.util.set";
35 local queue = require "prosody.util.queue";
36 36
37 local connect_timeout = module:get_option_period("s2s_timeout", 90); 37 local connect_timeout = module:get_option_period("s2s_timeout", 90);
38 local stream_close_timeout = module:get_option_period("s2s_close_timeout", 5); 38 local stream_close_timeout = module:get_option_period("s2s_close_timeout", 5);
39 local opt_keepalives = module:get_option_boolean("s2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true)); 39 local opt_keepalives = module:get_option_boolean("s2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true));
40 local secure_auth = module:get_option_boolean("s2s_secure_auth", false); -- One day... 40 local secure_auth = module:get_option_boolean("s2s_secure_auth", false); -- One day...
41 local secure_domains, insecure_domains = 41 local secure_domains, insecure_domains =
42 module:get_option_set("s2s_secure_domains", {})._items, module:get_option_set("s2s_insecure_domains", {})._items; 42 module:get_option_set("s2s_secure_domains", {})._items, module:get_option_set("s2s_insecure_domains", {})._items;
43 local require_encryption = module:get_option_boolean("s2s_require_encryption", true); 43 local require_encryption = module:get_option_boolean("s2s_require_encryption", true);
44 local stanza_size_limit = module:get_option_integer("s2s_stanza_size_limit", 1024*512, 10000); 44 local stanza_size_limit = module:get_option_integer("s2s_stanza_size_limit", 1024*512, 10000);
45 local sendq_size = module:get_option_integer("s2s_send_queue_size", 1024*32, 1);
45 46
46 local advertised_idle_timeout = 14*60; -- default in all net.server implementations 47 local advertised_idle_timeout = 14*60; -- default in all net.server implementations
47 local network_settings = module:get_option("network_settings"); 48 local network_settings = module:get_option("network_settings");
48 if type(network_settings) == "table" and type(network_settings.read_timeout) == "number" then 49 if type(network_settings) == "table" and type(network_settings.read_timeout) == "number" then
49 advertised_idle_timeout = network_settings.read_timeout; 50 advertised_idle_timeout = network_settings.read_timeout;
156 if errors.is_error(reason) then 157 if errors.is_error(reason) then
157 error_type, condition, reason_text = reason.type, reason.condition, reason.text; 158 error_type, condition, reason_text = reason.type, reason.condition, reason.text;
158 elseif type(reason) == "string" then 159 elseif type(reason) == "string" then
159 reason_text = reason; 160 reason_text = reason;
160 end 161 end
161 for i, stanza in ipairs(sendq) do 162 for stanza in sendq:consume() do
162 if not stanza.attr.xmlns and bouncy_stanzas[stanza.name] and stanza.attr.type ~= "error" and stanza.attr.type ~= "result" then 163 if not stanza.attr.xmlns and bouncy_stanzas[stanza.name] and stanza.attr.type ~= "error" and stanza.attr.type ~= "result" then
163 local reply = st.error_reply( 164 local reply = st.error_reply(
164 stanza, 165 stanza,
165 error_type, 166 error_type,
166 condition, 167 condition,
168 ); 169 );
169 core_process_stanza(dummy, reply); 170 core_process_stanza(dummy, reply);
170 else 171 else
171 (session.log or log)("debug", "Not eligible for bouncing, discarding %s", stanza:top_tag()); 172 (session.log or log)("debug", "Not eligible for bouncing, discarding %s", stanza:top_tag());
172 end 173 end
173 sendq[i] = nil;
174 end 174 end
175 session.sendq = nil; 175 session.sendq = nil;
176 end 176 end
177 177
178 -- Handles stanzas to existing s2s sessions 178 -- Handles stanzas to existing s2s sessions
192 -- We have a connection to this host already 192 -- We have a connection to this host already
193 if host.type == "s2sout_unauthed" and (stanza.name ~= "db:verify" or not host.dialback_key) then 193 if host.type == "s2sout_unauthed" and (stanza.name ~= "db:verify" or not host.dialback_key) then
194 (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host); 194 (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host);
195 195
196 -- Queue stanza until we are able to send it 196 -- Queue stanza until we are able to send it
197 if host.sendq then 197 if not host.sendq then
198 t_insert(host.sendq, st.clone(stanza));
199 else
200 -- luacheck: ignore 122 198 -- luacheck: ignore 122
201 host.sendq = { st.clone(stanza) }; 199 host.sendq = queue.new(sendq_size);
200 end
201 if not host.sendq:push(st.clone(stanza)) then
202 host.log("warn", "stanza [%s] not queued ", stanza.name);
203 event.origin.send(st.error_reply(stanza, "wait", "resource-constraint", "Outgoing stanza queue full"));
204 return true;
202 end 205 end
203 host.log("debug", "stanza [%s] queued ", stanza.name); 206 host.log("debug", "stanza [%s] queued ", stanza.name);
204 return true; 207 return true;
205 elseif host.type == "local" or host.type == "component" then 208 elseif host.type == "local" or host.type == "component" then
206 log("error", "Trying to send a stanza to ourselves??") 209 log("error", "Trying to send a stanza to ourselves??")
221 local host_session = s2s_new_outgoing(from_host, to_host); 224 local host_session = s2s_new_outgoing(from_host, to_host);
222 host_session.version = 1; 225 host_session.version = 1;
223 226
224 -- Store in buffer 227 -- Store in buffer
225 host_session.bounce_sendq = bounce_sendq; 228 host_session.bounce_sendq = bounce_sendq;
226 host_session.sendq = { st.clone(stanza) }; 229 host_session.sendq = queue.new(sendq_size);
230 host_session.sendq:push(stanza);
227 log("debug", "stanza [%s] queued until connection complete", stanza.name); 231 log("debug", "stanza [%s] queued until connection complete", stanza.name);
228 -- FIXME Cleaner solution to passing extra data from resolvers to net.server 232 -- FIXME Cleaner solution to passing extra data from resolvers to net.server
229 -- This mt-clone allows resolvers to add extra data, currently used for DANE TLSA records 233 -- This mt-clone allows resolvers to add extra data, currently used for DANE TLSA records
230 module:context(from_host):fire_event("s2sout-created", { session = host_session }); 234 module:context(from_host):fire_event("s2sout-created", { session = host_session });
231 local xmpp_extra = setmetatable({}, s2s_service_options_mt); 235 local xmpp_extra = setmetatable({}, s2s_service_options_mt);
362 366
363 if session.direction == "outgoing" then 367 if session.direction == "outgoing" then
364 if sendq then 368 if sendq then
365 session.log("debug", "sending %d queued stanzas across new outgoing connection to %s", #sendq, session.to_host); 369 session.log("debug", "sending %d queued stanzas across new outgoing connection to %s", #sendq, session.to_host);
366 local send = session.sends2s; 370 local send = session.sends2s;
367 for i, stanza in ipairs(sendq) do 371 for stanza in sendq:consume() do
372 -- TODO check send success
368 send(stanza); 373 send(stanza);
369 sendq[i] = nil;
370 end 374 end
371 session.sendq = nil; 375 session.sendq = nil;
372 end 376 end
373 end 377 end
374 378