Comparison

plugins/mod_s2s.lua @ 12462:11765f0605ec

mod_s2s: Store real stanzas in session.sendq, rather than strings This is the "right" thing to do. Strings were more memory-efficient, but e.g. bypassed stanza filters at reconnection time. Also not being stanzas prevents us from potential future work, such as merging sendq with mod_smacks. Regarding performance: we should counter the probable negative effect of this change with other positive changes that are desired anyway - e.g. a limit on the size of the sendq, improved in-memory representation of stanzas, s2s backoff (e.g. if a remote server is persistently unreachable, cache this failure for a while and don't just keep forever queuing stanzas for it).
author Matthew Wild <mwild1@gmail.com>
date Wed, 23 Mar 2022 15:25:22 +0000
parent 12362:0fd58f54d653
child 12472:48121960983e
comparison
equal deleted inserted replaced
12461:972671506251 12462:11765f0605ec
144 if errors.is_err(reason) then 144 if errors.is_err(reason) then
145 error_type, condition, reason_text = reason.type, reason.condition, reason.text; 145 error_type, condition, reason_text = reason.type, reason.condition, reason.text;
146 elseif type(reason) == "string" then 146 elseif type(reason) == "string" then
147 reason_text = reason; 147 reason_text = reason;
148 end 148 end
149 for i, data in ipairs(sendq) do 149 for i, stanza in ipairs(sendq) do
150 local reply = data[2]; 150 if not stanza.attr.xmlns and bouncy_stanzas[stanza.name] then
151 if reply and not(reply.attr.xmlns) and bouncy_stanzas[reply.name] then 151 local reply = st.error_reply(
152 reply.attr.type = "error"; 152 stanza,
153 reply:tag("error", {type = error_type, by = session.from_host}) 153 error_type,
154 :tag(condition, {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}):up(); 154 condition,
155 if reason_text then 155 reason_text and ("Server-to-server connection failed: "..reason_text) or nil
156 reply:tag("text", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}) 156 );
157 :text("Server-to-server connection failed: "..reason_text):up();
158 end
159 core_process_stanza(dummy, reply); 157 core_process_stanza(dummy, reply);
160 end 158 end
161 sendq[i] = nil; 159 sendq[i] = nil;
162 end 160 end
163 session.sendq = nil; 161 session.sendq = nil;
180 -- We have a connection to this host already 178 -- We have a connection to this host already
181 if host.type == "s2sout_unauthed" and (stanza.name ~= "db:verify" or not host.dialback_key) then 179 if host.type == "s2sout_unauthed" and (stanza.name ~= "db:verify" or not host.dialback_key) then
182 (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host); 180 (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host);
183 181
184 -- Queue stanza until we are able to send it 182 -- Queue stanza until we are able to send it
185 local queued_item = {
186 tostring(stanza),
187 stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza);
188 };
189 if host.sendq then 183 if host.sendq then
190 t_insert(host.sendq, queued_item); 184 t_insert(host.sendq, st.clone(stanza));
191 else 185 else
192 -- luacheck: ignore 122 186 -- luacheck: ignore 122
193 host.sendq = { queued_item }; 187 host.sendq = { st.clone(stanza) };
194 end 188 end
195 host.log("debug", "stanza [%s] queued ", stanza.name); 189 host.log("debug", "stanza [%s] queued ", stanza.name);
196 return true; 190 return true;
197 elseif host.type == "local" or host.type == "component" then 191 elseif host.type == "local" or host.type == "component" then
198 log("error", "Trying to send a stanza to ourselves??") 192 log("error", "Trying to send a stanza to ourselves??")
213 local host_session = s2s_new_outgoing(from_host, to_host); 207 local host_session = s2s_new_outgoing(from_host, to_host);
214 host_session.version = 1; 208 host_session.version = 1;
215 209
216 -- Store in buffer 210 -- Store in buffer
217 host_session.bounce_sendq = bounce_sendq; 211 host_session.bounce_sendq = bounce_sendq;
218 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; 212 host_session.sendq = { st.clone(stanza) };
219 log("debug", "stanza [%s] queued until connection complete", stanza.name); 213 log("debug", "stanza [%s] queued until connection complete", stanza.name);
220 -- FIXME Cleaner solution to passing extra data from resolvers to net.server 214 -- FIXME Cleaner solution to passing extra data from resolvers to net.server
221 -- This mt-clone allows resolvers to add extra data, currently used for DANE TLSA records 215 -- This mt-clone allows resolvers to add extra data, currently used for DANE TLSA records
222 module:context(from_host):fire_event("s2sout-created", { session = host_session }); 216 module:context(from_host):fire_event("s2sout-created", { session = host_session });
223 local xmpp_extra = setmetatable({}, s2s_service_options_mt); 217 local xmpp_extra = setmetatable({}, s2s_service_options_mt);
322 316
323 if session.direction == "outgoing" then 317 if session.direction == "outgoing" then
324 if sendq then 318 if sendq then
325 session.log("debug", "sending %d queued stanzas across new outgoing connection to %s", #sendq, session.to_host); 319 session.log("debug", "sending %d queued stanzas across new outgoing connection to %s", #sendq, session.to_host);
326 local send = session.sends2s; 320 local send = session.sends2s;
327 for i, data in ipairs(sendq) do 321 for i, stanza in ipairs(sendq) do
328 send(data[1]); 322 send(stanza);
329 sendq[i] = nil; 323 sendq[i] = nil;
330 end 324 end
331 session.sendq = nil; 325 session.sendq = nil;
332 end 326 end
333 end 327 end