Software /
code /
prosody
Comparison
plugins/mod_s2s.lua @ 13554:902d25cd0557
mod_s2s: Limit size of outgoing stanza queue
This queue is used to buffer stanzas while waiting for an outgoing s2s
connection to be established.
Limit it to prevent excessive memory usage.
Default chosen to approximate how many average stanzas fits in the
server_epoll default max_send_buffer_size of 32 MiB
Returns a custom error instead of the default core.stanza_router
"Communication with remote domains is not enabled" from is sent back,
which does not describe what is happening here.
Closes #1106
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Sat, 09 Nov 2024 16:47:14 +0100 |
parent | 13534:d532176d4334 |
child | 13555:42b98ee73ca8 |
comparison
equal
deleted
inserted
replaced
13553:850e4ade7a01 | 13554:902d25cd0557 |
---|---|
11 local prosody = prosody; | 11 local prosody = prosody; |
12 local hosts = prosody.hosts; | 12 local hosts = prosody.hosts; |
13 local core_process_stanza = prosody.core_process_stanza; | 13 local core_process_stanza = prosody.core_process_stanza; |
14 | 14 |
15 local tostring, type = tostring, type; | 15 local tostring, type = tostring, type; |
16 local t_insert = table.insert; | |
17 local traceback = debug.traceback; | 16 local traceback = debug.traceback; |
18 | 17 |
19 local add_task = require "prosody.util.timer".add_task; | 18 local add_task = require "prosody.util.timer".add_task; |
20 local stop_timer = require "prosody.util.timer".stop; | 19 local stop_timer = require "prosody.util.timer".stop; |
21 local st = require "prosody.util.stanza"; | 20 local st = require "prosody.util.stanza"; |
31 local connect = require "prosody.net.connect".connect; | 30 local connect = require "prosody.net.connect".connect; |
32 local service = require "prosody.net.resolvers.service"; | 31 local service = require "prosody.net.resolvers.service"; |
33 local resolver_chain = require "prosody.net.resolvers.chain"; | 32 local resolver_chain = require "prosody.net.resolvers.chain"; |
34 local errors = require "prosody.util.error"; | 33 local errors = require "prosody.util.error"; |
35 local set = require "prosody.util.set"; | 34 local set = require "prosody.util.set"; |
35 local queue = require "prosody.util.queue"; | |
36 | 36 |
37 local connect_timeout = module:get_option_period("s2s_timeout", 90); | 37 local connect_timeout = module:get_option_period("s2s_timeout", 90); |
38 local stream_close_timeout = module:get_option_period("s2s_close_timeout", 5); | 38 local stream_close_timeout = module:get_option_period("s2s_close_timeout", 5); |
39 local opt_keepalives = module:get_option_boolean("s2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true)); | 39 local opt_keepalives = module:get_option_boolean("s2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true)); |
40 local secure_auth = module:get_option_boolean("s2s_secure_auth", false); -- One day... | 40 local secure_auth = module:get_option_boolean("s2s_secure_auth", false); -- One day... |
41 local secure_domains, insecure_domains = | 41 local secure_domains, insecure_domains = |
42 module:get_option_set("s2s_secure_domains", {})._items, module:get_option_set("s2s_insecure_domains", {})._items; | 42 module:get_option_set("s2s_secure_domains", {})._items, module:get_option_set("s2s_insecure_domains", {})._items; |
43 local require_encryption = module:get_option_boolean("s2s_require_encryption", true); | 43 local require_encryption = module:get_option_boolean("s2s_require_encryption", true); |
44 local stanza_size_limit = module:get_option_integer("s2s_stanza_size_limit", 1024*512, 10000); | 44 local stanza_size_limit = module:get_option_integer("s2s_stanza_size_limit", 1024*512, 10000); |
45 local sendq_size = module:get_option_integer("s2s_send_queue_size", 1024*32, 1); | |
45 | 46 |
46 local advertised_idle_timeout = 14*60; -- default in all net.server implementations | 47 local advertised_idle_timeout = 14*60; -- default in all net.server implementations |
47 local network_settings = module:get_option("network_settings"); | 48 local network_settings = module:get_option("network_settings"); |
48 if type(network_settings) == "table" and type(network_settings.read_timeout) == "number" then | 49 if type(network_settings) == "table" and type(network_settings.read_timeout) == "number" then |
49 advertised_idle_timeout = network_settings.read_timeout; | 50 advertised_idle_timeout = network_settings.read_timeout; |
156 if errors.is_error(reason) then | 157 if errors.is_error(reason) then |
157 error_type, condition, reason_text = reason.type, reason.condition, reason.text; | 158 error_type, condition, reason_text = reason.type, reason.condition, reason.text; |
158 elseif type(reason) == "string" then | 159 elseif type(reason) == "string" then |
159 reason_text = reason; | 160 reason_text = reason; |
160 end | 161 end |
161 for i, stanza in ipairs(sendq) do | 162 for stanza in sendq:consume() do |
162 if not stanza.attr.xmlns and bouncy_stanzas[stanza.name] and stanza.attr.type ~= "error" and stanza.attr.type ~= "result" then | 163 if not stanza.attr.xmlns and bouncy_stanzas[stanza.name] and stanza.attr.type ~= "error" and stanza.attr.type ~= "result" then |
163 local reply = st.error_reply( | 164 local reply = st.error_reply( |
164 stanza, | 165 stanza, |
165 error_type, | 166 error_type, |
166 condition, | 167 condition, |
168 ); | 169 ); |
169 core_process_stanza(dummy, reply); | 170 core_process_stanza(dummy, reply); |
170 else | 171 else |
171 (session.log or log)("debug", "Not eligible for bouncing, discarding %s", stanza:top_tag()); | 172 (session.log or log)("debug", "Not eligible for bouncing, discarding %s", stanza:top_tag()); |
172 end | 173 end |
173 sendq[i] = nil; | |
174 end | 174 end |
175 session.sendq = nil; | 175 session.sendq = nil; |
176 end | 176 end |
177 | 177 |
178 -- Handles stanzas to existing s2s sessions | 178 -- Handles stanzas to existing s2s sessions |
192 -- We have a connection to this host already | 192 -- We have a connection to this host already |
193 if host.type == "s2sout_unauthed" and (stanza.name ~= "db:verify" or not host.dialback_key) then | 193 if host.type == "s2sout_unauthed" and (stanza.name ~= "db:verify" or not host.dialback_key) then |
194 (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host); | 194 (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host); |
195 | 195 |
196 -- Queue stanza until we are able to send it | 196 -- Queue stanza until we are able to send it |
197 if host.sendq then | 197 if not host.sendq then |
198 t_insert(host.sendq, st.clone(stanza)); | |
199 else | |
200 -- luacheck: ignore 122 | 198 -- luacheck: ignore 122 |
201 host.sendq = { st.clone(stanza) }; | 199 host.sendq = queue.new(sendq_size); |
200 end | |
201 if not host.sendq:push(st.clone(stanza)) then | |
202 host.log("warn", "stanza [%s] not queued ", stanza.name); | |
203 event.origin.send(st.error_reply(stanza, "wait", "resource-constraint", "Outgoing stanza queue full")); | |
204 return true; | |
202 end | 205 end |
203 host.log("debug", "stanza [%s] queued ", stanza.name); | 206 host.log("debug", "stanza [%s] queued ", stanza.name); |
204 return true; | 207 return true; |
205 elseif host.type == "local" or host.type == "component" then | 208 elseif host.type == "local" or host.type == "component" then |
206 log("error", "Trying to send a stanza to ourselves??") | 209 log("error", "Trying to send a stanza to ourselves??") |
221 local host_session = s2s_new_outgoing(from_host, to_host); | 224 local host_session = s2s_new_outgoing(from_host, to_host); |
222 host_session.version = 1; | 225 host_session.version = 1; |
223 | 226 |
224 -- Store in buffer | 227 -- Store in buffer |
225 host_session.bounce_sendq = bounce_sendq; | 228 host_session.bounce_sendq = bounce_sendq; |
226 host_session.sendq = { st.clone(stanza) }; | 229 host_session.sendq = queue.new(sendq_size); |
230 host_session.sendq:push(stanza); | |
227 log("debug", "stanza [%s] queued until connection complete", stanza.name); | 231 log("debug", "stanza [%s] queued until connection complete", stanza.name); |
228 -- FIXME Cleaner solution to passing extra data from resolvers to net.server | 232 -- FIXME Cleaner solution to passing extra data from resolvers to net.server |
229 -- This mt-clone allows resolvers to add extra data, currently used for DANE TLSA records | 233 -- This mt-clone allows resolvers to add extra data, currently used for DANE TLSA records |
230 module:context(from_host):fire_event("s2sout-created", { session = host_session }); | 234 module:context(from_host):fire_event("s2sout-created", { session = host_session }); |
231 local xmpp_extra = setmetatable({}, s2s_service_options_mt); | 235 local xmpp_extra = setmetatable({}, s2s_service_options_mt); |
362 | 366 |
363 if session.direction == "outgoing" then | 367 if session.direction == "outgoing" then |
364 if sendq then | 368 if sendq then |
365 session.log("debug", "sending %d queued stanzas across new outgoing connection to %s", #sendq, session.to_host); | 369 session.log("debug", "sending %d queued stanzas across new outgoing connection to %s", #sendq, session.to_host); |
366 local send = session.sends2s; | 370 local send = session.sends2s; |
367 for i, stanza in ipairs(sendq) do | 371 for stanza in sendq:consume() do |
372 -- TODO check send success | |
368 send(stanza); | 373 send(stanza); |
369 sendq[i] = nil; | |
370 end | 374 end |
371 session.sendq = nil; | 375 session.sendq = nil; |
372 end | 376 end |
373 end | 377 end |
374 | 378 |