Comparison

plugins/mod_s2s.lua @ 10879:5c7bb2440f53

mod_s2s: Move out of empty directory mod_s2s.lua had been all alone in there since the removal of s2sout.lib.lua in 756b8821007a
author Kim Alvefur <zash@zash.se>
date Tue, 02 Jun 2020 19:43:50 +0200
parent 10850:plugins/mod_s2s/mod_s2s.lua@bd2814f900dd
child 11019:d1604721b665
comparison
equal deleted inserted replaced
10878:b37dc3776f69 10879:5c7bb2440f53
1 -- Prosody IM
2 -- Copyright (C) 2008-2010 Matthew Wild
3 -- Copyright (C) 2008-2010 Waqas Hussain
4 --
5 -- This project is MIT/X11 licensed. Please see the
6 -- COPYING file in the source package for more information.
7 --
8
9 module:set_global();
10
11 local prosody = prosody;
12 local hosts = prosody.hosts;
13 local core_process_stanza = prosody.core_process_stanza;
14
15 local tostring, type = tostring, type;
16 local t_insert = table.insert;
17 local traceback = debug.traceback;
18
19 local add_task = require "util.timer".add_task;
20 local st = require "util.stanza";
21 local initialize_filters = require "util.filters".initialize;
22 local nameprep = require "util.encodings".stringprep.nameprep;
23 local new_xmpp_stream = require "util.xmppstream".new;
24 local s2s_new_incoming = require "core.s2smanager".new_incoming;
25 local s2s_new_outgoing = require "core.s2smanager".new_outgoing;
26 local s2s_destroy_session = require "core.s2smanager".destroy_session;
27 local uuid_gen = require "util.uuid".generate;
28 local fire_global_event = prosody.events.fire_event;
29 local runner = require "util.async".runner;
30 local connect = require "net.connect".connect;
31 local service = require "net.resolvers.service";
32 local errors = require "util.error";
33 local set = require "util.set";
34
35 local connect_timeout = module:get_option_number("s2s_timeout", 90);
36 local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5);
37 local opt_keepalives = module:get_option_boolean("s2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true));
38 local secure_auth = module:get_option_boolean("s2s_secure_auth", false); -- One day...
39 local secure_domains, insecure_domains =
40 module:get_option_set("s2s_secure_domains", {})._items, module:get_option_set("s2s_insecure_domains", {})._items;
41 local require_encryption = module:get_option_boolean("s2s_require_encryption", false);
42 local stanza_size_limit = module:get_option_number("s2s_stanza_size_limit"); -- TODO come up with a sensible default (util.xmppstream defaults to 10M)
43
44 local measure_connections = module:measure("connections", "amount");
45 local measure_ipv6 = module:measure("ipv6", "amount");
46
47 local sessions = module:shared("sessions");
48
49 local runner_callbacks = {};
50
51 local listener = {};
52
53 local log = module._log;
54
55 local s2s_service_options = {
56 default_port = 5269;
57 use_ipv4 = module:get_option_boolean("use_ipv4", true);
58 use_ipv6 = module:get_option_boolean("use_ipv6", true);
59 };
60
61 module:hook("stats-update", function ()
62 local count = 0;
63 local ipv6 = 0;
64 for _, session in pairs(sessions) do
65 count = count + 1;
66 if session.ip and session.ip:match(":") then
67 ipv6 = ipv6 + 1;
68 end
69 end
70 measure_connections(count);
71 measure_ipv6(ipv6);
72 end);
73
74 --- Handle stanzas to remote domains
75
76 local bouncy_stanzas = { message = true, presence = true, iq = true };
77 local function bounce_sendq(session, reason)
78 local sendq = session.sendq;
79 if not sendq then return; end
80 session.log("info", "Sending error replies for %d queued stanzas because of failed outgoing connection to %s", #sendq, session.to_host);
81 local dummy = {
82 type = "s2sin";
83 send = function ()
84 (session.log or log)("error", "Replying to to an s2s error reply, please report this! Traceback: %s", traceback());
85 end;
86 dummy = true;
87 close = function ()
88 (session.log or log)("error", "Attempting to close the dummy origin of s2s error replies, please report this! Traceback: %s", traceback());
89 end;
90 };
91 -- FIXME Allow for more specific error conditions
92 -- TODO use util.error ?
93 local error_type = "cancel";
94 local condition = "remote-server-not-found";
95 local reason_text;
96 if session.had_stream then -- set when a stream is opened by the remote
97 error_type, condition = "wait", "remote-server-timeout";
98 end
99 if errors.is_err(reason) then
100 error_type, condition, reason_text = reason.type, reason.condition, reason.text;
101 elseif type(reason) == "string" then
102 reason_text = reason;
103 end
104 for i, data in ipairs(sendq) do
105 local reply = data[2];
106 if reply and not(reply.attr.xmlns) and bouncy_stanzas[reply.name] then
107 reply.attr.type = "error";
108 reply:tag("error", {type = error_type, by = session.from_host})
109 :tag(condition, {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}):up();
110 if reason_text then
111 reply:tag("text", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"})
112 :text("Server-to-server connection failed: "..reason_text):up();
113 end
114 core_process_stanza(dummy, reply);
115 end
116 sendq[i] = nil;
117 end
118 session.sendq = nil;
119 end
120
121 -- Handles stanzas to existing s2s sessions
122 function route_to_existing_session(event)
123 local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza;
124 if not hosts[from_host] then
125 log("warn", "Attempt to send stanza from %s - a host we don't serve", from_host);
126 return false;
127 end
128 if hosts[to_host] then
129 log("warn", "Attempt to route stanza to a remote %s - a host we do serve?!", from_host);
130 return false;
131 end
132 local host = hosts[from_host].s2sout[to_host];
133 if not host then return end
134
135 -- We have a connection to this host already
136 if host.type == "s2sout_unauthed" and (stanza.name ~= "db:verify" or not host.dialback_key) then
137 (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host);
138
139 -- Queue stanza until we are able to send it
140 local queued_item = {
141 tostring(stanza),
142 stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza);
143 };
144 if host.sendq then
145 t_insert(host.sendq, queued_item);
146 else
147 -- luacheck: ignore 122
148 host.sendq = { queued_item };
149 end
150 host.log("debug", "stanza [%s] queued ", stanza.name);
151 return true;
152 elseif host.type == "local" or host.type == "component" then
153 log("error", "Trying to send a stanza to ourselves??")
154 log("error", "Traceback: %s", traceback());
155 log("error", "Stanza: %s", stanza);
156 return false;
157 else
158 if host.sends2s(stanza) then
159 return true;
160 end
161 end
162 end
163
164 -- Create a new outgoing session for a stanza
165 function route_to_new_session(event)
166 local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza;
167 log("debug", "opening a new outgoing connection for this stanza");
168 local host_session = s2s_new_outgoing(from_host, to_host);
169 host_session.version = 1;
170
171 -- Store in buffer
172 host_session.bounce_sendq = bounce_sendq;
173 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} };
174 log("debug", "stanza [%s] queued until connection complete", stanza.name);
175 connect(service.new(to_host, "xmpp-server", "tcp", s2s_service_options), listener, nil, { session = host_session });
176 return true;
177 end
178
179 local function keepalive(event)
180 return event.session.sends2s(' ');
181 end
182
183 module:hook("s2s-read-timeout", keepalive, -1);
184
185 function module.add_host(module)
186 if module:get_option_boolean("disallow_s2s", false) then
187 module:log("warn", "The 'disallow_s2s' config option is deprecated, please see https://prosody.im/doc/s2s#disabling");
188 return nil, "This host has disallow_s2s set";
189 end
190 module:hook("route/remote", route_to_existing_session, -1);
191 module:hook("route/remote", route_to_new_session, -10);
192 module:hook("s2s-authenticated", make_authenticated, -1);
193 module:hook("s2s-read-timeout", keepalive, -1);
194 module:hook_stanza("http://etherx.jabber.org/streams", "features", function (session, stanza) -- luacheck: ignore 212/stanza
195 if session.type == "s2sout" then
196 -- Stream is authenticated and we are seem to be done with feature negotiation,
197 -- so the stream is ready for stanzas. RFC 6120 Section 4.3
198 mark_connected(session);
199 return true;
200 elseif require_encryption and not session.secure then
201 session.log("warn", "Encrypted server-to-server communication is required but was not offered by %s", session.to_host);
202 session:close({
203 condition = "policy-violation",
204 text = "Encrypted server-to-server communication is required but was not offered",
205 }, nil, "Could not establish encrypted connection to remote server");
206 return true;
207 elseif not session.dialback_verifying then
208 session.log("warn", "No SASL EXTERNAL offer and Dialback doesn't seem to be enabled, giving up");
209 session:close({
210 condition = "unsupported-feature",
211 text = "No viable authentication method offered",
212 }, nil, "No viable authentication method offered by remote server");
213 return true;
214 end
215 end, -1);
216 end
217
218 -- Stream is authorised, and ready for normal stanzas
219 function mark_connected(session)
220
221 local sendq = session.sendq;
222
223 local from, to = session.from_host, session.to_host;
224
225 session.log("info", "%s s2s connection %s->%s complete", session.direction:gsub("^.", string.upper), from, to);
226
227 local event_data = { session = session };
228 if session.type == "s2sout" then
229 fire_global_event("s2sout-established", event_data);
230 hosts[from].events.fire_event("s2sout-established", event_data);
231
232 if session.incoming then
233 session.send = function(stanza)
234 return hosts[from].events.fire_event("route/remote", { from_host = from, to_host = to, stanza = stanza });
235 end;
236 end
237
238 else
239 if session.outgoing and not hosts[to].s2sout[from] then
240 session.log("debug", "Setting up to handle route from %s to %s", to, from);
241 hosts[to].s2sout[from] = session; -- luacheck: ignore 122
242 end
243 local host_session = hosts[to];
244 session.send = function(stanza)
245 return host_session.events.fire_event("route/remote", { from_host = to, to_host = from, stanza = stanza });
246 end;
247
248 fire_global_event("s2sin-established", event_data);
249 hosts[to].events.fire_event("s2sin-established", event_data);
250 end
251
252 if session.direction == "outgoing" then
253 if sendq then
254 session.log("debug", "sending %d queued stanzas across new outgoing connection to %s", #sendq, session.to_host);
255 local send = session.sends2s;
256 for i, data in ipairs(sendq) do
257 send(data[1]);
258 sendq[i] = nil;
259 end
260 session.sendq = nil;
261 end
262 end
263 end
264
265 function make_authenticated(event)
266 local session, host = event.session, event.host;
267 if not session.secure then
268 if require_encryption or (secure_auth and not(insecure_domains[host])) or secure_domains[host] then
269 session:close({
270 condition = "policy-violation",
271 text = "Encrypted server-to-server communication is required but was not "
272 ..((session.direction == "outgoing" and "offered") or "used")
273 }, nil, "Could not establish encrypted connection to remote server");
274 end
275 end
276 if hosts[host] then
277 session:close({ condition = "undefined-condition", text = "Attempt to authenticate as a host we serve" });
278 end
279 if session.type == "s2sout_unauthed" then
280 session.type = "s2sout";
281 elseif session.type == "s2sin_unauthed" then
282 session.type = "s2sin";
283 elseif session.type ~= "s2sin" and session.type ~= "s2sout" then
284 return false;
285 end
286
287 if session.incoming and host then
288 if not session.hosts[host] then session.hosts[host] = {}; end
289 session.hosts[host].authed = true;
290 end
291 session.log("debug", "connection %s->%s is now authenticated for %s", session.from_host, session.to_host, host);
292
293 if (session.type == "s2sout" and session.external_auth ~= "succeeded") or session.type == "s2sin" then
294 -- Stream either used dialback for authentication or is an incoming stream.
295 mark_connected(session);
296 end
297
298 return true;
299 end
300
301 --- Helper to check that a session peer's certificate is valid
302 function check_cert_status(session)
303 local host = session.direction == "outgoing" and session.to_host or session.from_host
304 local conn = session.conn:socket()
305 local cert
306 if conn.getpeercertificate then
307 cert = conn:getpeercertificate()
308 end
309
310 return module:fire_event("s2s-check-certificate", { host = host, session = session, cert = cert });
311 end
312
313 --- XMPP stream event handlers
314
315 local stream_callbacks = { default_ns = "jabber:server" };
316
317 function stream_callbacks.handlestanza(session, stanza)
318 stanza = session.filter("stanzas/in", stanza);
319 session.thread:run(stanza);
320 end
321
322 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
323
324 function stream_callbacks.streamopened(session, attr)
325 -- run _streamopened in async context
326 session.thread:run({ stream = "opened", attr = attr });
327 end
328
329 function stream_callbacks._streamopened(session, attr)
330 session.version = tonumber(attr.version) or 0;
331 session.had_stream = true; -- Had a stream opened at least once
332
333 -- TODO: Rename session.secure to session.encrypted
334 if session.secure == false then
335 session.secure = true;
336 session.encrypted = true;
337
338 local sock = session.conn:socket();
339 if sock.info then
340 local info = sock:info();
341 (session.log or log)("info", "Stream encrypted (%s with %s)", info.protocol, info.cipher);
342 session.compressed = info.compression;
343 else
344 (session.log or log)("info", "Stream encrypted");
345 end
346 end
347
348 if session.direction == "incoming" then
349 -- Send a reply stream header
350
351 -- Validate to/from
352 local to, from = attr.to, attr.from;
353 if to then to = nameprep(attr.to); end
354 if from then from = nameprep(attr.from); end
355 if not to and attr.to then -- COMPAT: Some servers do not reliably set 'to' (especially on stream restarts)
356 session:close({ condition = "improper-addressing", text = "Invalid 'to' address" });
357 return;
358 end
359 if not from and attr.from then -- COMPAT: Some servers do not reliably set 'from' (especially on stream restarts)
360 session:close({ condition = "improper-addressing", text = "Invalid 'from' address" });
361 return;
362 end
363
364 -- Set session.[from/to]_host if they have not been set already and if
365 -- this session isn't already authenticated
366 if session.type == "s2sin_unauthed" and from and not session.from_host then
367 session.from_host = from;
368 elseif from ~= session.from_host then
369 session:close({ condition = "improper-addressing", text = "New stream 'from' attribute does not match original" });
370 return;
371 end
372 if session.type == "s2sin_unauthed" and to and not session.to_host then
373 session.to_host = to;
374 elseif to ~= session.to_host then
375 session:close({ condition = "improper-addressing", text = "New stream 'to' attribute does not match original" });
376 return;
377 end
378
379 -- For convenience we'll put the sanitised values into these variables
380 to, from = session.to_host, session.from_host;
381
382 session.streamid = uuid_gen();
383 (session.log or log)("debug", "Incoming s2s received %s", st.stanza("stream:stream", attr):top_tag());
384 if to then
385 if not hosts[to] then
386 -- Attempting to connect to a host we don't serve
387 session:close({
388 condition = "host-unknown";
389 text = "This host does not serve "..to
390 });
391 return;
392 elseif not hosts[to].modules.s2s then
393 -- Attempting to connect to a host that disallows s2s
394 session:close({
395 condition = "policy-violation";
396 text = "Server-to-server communication is disabled for this host";
397 });
398 return;
399 end
400 end
401
402 if hosts[from] then
403 session:close({ condition = "undefined-condition", text = "Attempt to connect from a host we serve" });
404 return;
405 end
406
407 if session.secure and not session.cert_chain_status then
408 if check_cert_status(session) == false then
409 return;
410 end
411 end
412
413 session:open_stream(session.to_host, session.from_host)
414 session.notopen = nil;
415 if session.version >= 1.0 then
416 local features = st.stanza("stream:features");
417
418 if to then
419 hosts[to].events.fire_event("s2s-stream-features", { origin = session, features = features });
420 else
421 (session.log or log)("warn", "No 'to' on stream header from %s means we can't offer any features", from or session.ip or "unknown host");
422 fire_global_event("s2s-stream-features-legacy", { origin = session, features = features });
423 end
424
425 if ( session.type == "s2sin" or session.type == "s2sout" ) or features.tags[1] then
426 log("debug", "Sending stream features: %s", features);
427 session.sends2s(features);
428 else
429 (session.log or log)("warn", "No stream features to offer, giving up");
430 session:close({ condition = "undefined-condition", text = "No stream features to offer" });
431 end
432 end
433 elseif session.direction == "outgoing" then
434 session.notopen = nil;
435 if not attr.id then
436 log("warn", "Stream response did not give us a stream id!");
437 session:close({ condition = "undefined-condition", text = "Missing stream ID" });
438 return;
439 end
440 session.streamid = attr.id;
441
442 if session.secure and not session.cert_chain_status then
443 if check_cert_status(session) == false then
444 return;
445 end
446 end
447
448 -- If server is pre-1.0, don't wait for features, just do dialback
449 if session.version < 1.0 then
450 if not session.dialback_verifying then
451 hosts[session.from_host].events.fire_event("s2sout-authenticate-legacy", { origin = session });
452 else
453 mark_connected(session);
454 end
455 end
456 end
457 end
458
459 function stream_callbacks._streamclosed(session)
460 (session.log or log)("debug", "Received </stream:stream>");
461 session:close(false);
462 end
463
464 function stream_callbacks.streamclosed(session, attr)
465 -- run _streamclosed in async context
466 session.thread:run({ stream = "closed", attr = attr });
467 end
468
469 function stream_callbacks.error(session, error, data)
470 if error == "no-stream" then
471 session.log("debug", "Invalid opening stream header (%s)", (data:gsub("^([^\1]+)\1", "{%1}")));
472 session:close("invalid-namespace");
473 elseif error == "parse-error" then
474 session.log("debug", "Server-to-server XML parse error: %s", error);
475 session:close("not-well-formed");
476 elseif error == "stream-error" then
477 local condition, text = "undefined-condition";
478 for child in data:childtags(nil, xmlns_xmpp_streams) do
479 if child.name ~= "text" then
480 condition = child.name;
481 else
482 text = child:get_text();
483 end
484 if condition ~= "undefined-condition" and text then
485 break;
486 end
487 end
488 text = condition .. (text and (" ("..text..")") or "");
489 session.log("info", "Session closed by remote with error: %s", text);
490 session:close(nil, text);
491 end
492 end
493
494 --- Session methods
495 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
496 -- reason: stream error to send to the remote server
497 -- remote_reason: stream error received from the remote server
498 -- bounce_reason: stanza error to pass to bounce_sendq because stream- and stanza errors are different
499 local function session_close(session, reason, remote_reason, bounce_reason)
500 local log = session.log or log;
501 if session.conn then
502 if session.notopen then
503 if session.direction == "incoming" then
504 session:open_stream(session.to_host, session.from_host);
505 else
506 session:open_stream(session.from_host, session.to_host);
507 end
508 end
509 if reason then -- nil == no err, initiated by us, false == initiated by remote
510 local stream_error;
511 if type(reason) == "string" then -- assume stream error
512 stream_error = st.stanza("stream:error"):tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' });
513 elseif type(reason) == "table" and not st.is_stanza(reason) then
514 stream_error = st.stanza("stream:error"):tag(reason.condition or "undefined-condition", stream_xmlns_attr):up();
515 if reason.text then
516 stream_error:tag("text", stream_xmlns_attr):text(reason.text):up();
517 end
518 if reason.extra then
519 stream_error:add_child(reason.extra);
520 end
521 end
522 if st.is_stanza(stream_error) then
523 -- to and from are never unknown on outgoing connections
524 log("debug", "Disconnecting %s->%s[%s], <stream:error> is: %s",
525 session.from_host or "(unknown host)" or session.ip, session.to_host or "(unknown host)", session.type, reason);
526 session.sends2s(stream_error);
527 end
528 end
529
530 session.sends2s("</stream:stream>");
531 function session.sends2s() return false; end
532
533 -- luacheck: ignore 422/reason
534 -- FIXME reason should be managed in a place common to c2s, s2s, bosh, component etc
535 local reason = remote_reason or (reason and (reason.text or reason.condition)) or reason;
536 session.log("info", "%s s2s stream %s->%s closed: %s", session.direction:gsub("^.", string.upper),
537 session.from_host or "(unknown host)", session.to_host or "(unknown host)", reason or "stream closed");
538
539 -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
540 local conn = session.conn;
541 if reason == nil and not session.notopen and session.incoming then
542 add_task(stream_close_timeout, function ()
543 if not session.destroyed then
544 session.log("warn", "Failed to receive a stream close response, closing connection anyway...");
545 s2s_destroy_session(session, reason, bounce_reason);
546 conn:close();
547 end
548 end);
549 else
550 s2s_destroy_session(session, reason, bounce_reason);
551 conn:close(); -- Close immediately, as this is an outgoing connection or is not authed
552 end
553 end
554 end
555
556 function session_stream_attrs(session, from, to, attr) -- luacheck: ignore 212/session
557 if not from or (hosts[from] and hosts[from].modules.dialback) then
558 attr["xmlns:db"] = 'jabber:server:dialback';
559 end
560 if not from then
561 attr.from = '';
562 end
563 if not to then
564 attr.to = '';
565 end
566 end
567
568 -- Session initialization logic shared by incoming and outgoing
569 local function initialize_session(session)
570 local stream = new_xmpp_stream(session, stream_callbacks, stanza_size_limit);
571
572 session.thread = runner(function (stanza)
573 if st.is_stanza(stanza) then
574 core_process_stanza(session, stanza);
575 elseif stanza.stream == "opened" then
576 stream_callbacks._streamopened(session, stanza.attr);
577 elseif stanza.stream == "closed" then
578 stream_callbacks._streamclosed(session, stanza.attr);
579 end
580 end, runner_callbacks, session);
581
582 local log = session.log or log;
583 session.stream = stream;
584
585 session.notopen = true;
586
587 function session.reset_stream()
588 session.notopen = true;
589 session.streamid = nil;
590 session.stream:reset();
591 end
592
593 session.stream_attrs = session_stream_attrs;
594
595 local filter = initialize_filters(session);
596 local conn = session.conn;
597 local w = conn.write;
598
599 function session.sends2s(t)
600 log("debug", "Sending[%s]: %s", session.type, t.top_tag and t:top_tag() or t:match("^[^>]*>?"));
601 if t.name then
602 t = filter("stanzas/out", t);
603 end
604 if t then
605 t = filter("bytes/out", tostring(t));
606 if t then
607 return w(conn, t);
608 end
609 end
610 end
611
612 function session.data(data)
613 data = filter("bytes/in", data);
614 if data then
615 local ok, err = stream:feed(data);
616 if ok then return; end
617 log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300));
618 if err == "stanza-too-large" then
619 session:close({ condition = "policy-violation", text = "XML stanza is too big" }, nil, "Received invalid XML from remote server");
620 else
621 session:close("not-well-formed", nil, "Received invalid XML from remote server");
622 end
623 end
624 end
625
626 session.close = session_close;
627
628 local handlestanza = stream_callbacks.handlestanza;
629 function session.dispatch_stanza(session, stanza) -- luacheck: ignore 432/session
630 return handlestanza(session, stanza);
631 end
632
633 module:fire_event("s2s-created", { session = session });
634
635 add_task(connect_timeout, function ()
636 if session.type == "s2sin" or session.type == "s2sout" then
637 return; -- Ok, we're connected
638 elseif session.type == "s2s_destroyed" then
639 return; -- Session already destroyed
640 end
641 -- Not connected, need to close session and clean up
642 (session.log or log)("debug", "Destroying incomplete session %s->%s due to inactivity",
643 session.from_host or "(unknown)", session.to_host or "(unknown)");
644 session:close("connection-timeout");
645 end);
646 end
647
648 function runner_callbacks:ready()
649 self.data.log("debug", "Runner %s ready (%s)", self.thread, coroutine.status(self.thread));
650 self.data.conn:resume();
651 end
652
653 function runner_callbacks:waiting()
654 self.data.log("debug", "Runner %s waiting (%s)", self.thread, coroutine.status(self.thread));
655 self.data.conn:pause();
656 end
657
658 function runner_callbacks:error(err)
659 (self.data.log or log)("error", "Traceback[s2s]: %s", err);
660 end
661
662 function listener.onconnect(conn)
663 conn:setoption("keepalive", opt_keepalives);
664 local session = sessions[conn];
665 if not session then -- New incoming connection
666 session = s2s_new_incoming(conn);
667 sessions[conn] = session;
668 session.log("debug", "Incoming s2s connection");
669 initialize_session(session);
670 else -- Outgoing session connected
671 session:open_stream(session.from_host, session.to_host);
672 end
673 session.ip = conn:ip();
674 end
675
676 function listener.onincoming(conn, data)
677 local session = sessions[conn];
678 if session then
679 session.data(data);
680 end
681 end
682
683 function listener.onstatus(conn, status)
684 if status == "ssl-handshake-complete" then
685 local session = sessions[conn];
686 if session and session.direction == "outgoing" then
687 session.log("debug", "Sending stream header...");
688 session:open_stream(session.from_host, session.to_host);
689 end
690 end
691 end
692
693 function listener.ondisconnect(conn, err)
694 local session = sessions[conn];
695 if session then
696 sessions[conn] = nil;
697 (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed");
698 if session.secure == false and err then
699 -- TODO util.error-ify this
700 err = "Error during negotiation of encrypted connection: "..err;
701 end
702 s2s_destroy_session(session, err);
703 end
704 end
705
706 function listener.onfail(data, err)
707 local session = data and data.session;
708 if session then
709 if err and session.direction == "outgoing" and session.notopen then
710 (session.log or log)("debug", "s2s connection attempt failed: %s", err);
711 end
712 (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed");
713 s2s_destroy_session(session, err);
714 end
715 end
716
717 function listener.onreadtimeout(conn)
718 local session = sessions[conn];
719 if session then
720 local host = session.host or session.to_host;
721 return (hosts[host] or prosody).events.fire_event("s2s-read-timeout", { session = session });
722 end
723 end
724
725 function listener.register_outgoing(conn, session)
726 sessions[conn] = session;
727 initialize_session(session);
728 end
729
730 function listener.ondetach(conn)
731 sessions[conn] = nil;
732 end
733
734 function listener.onattach(conn, data)
735 local session = data and data.session;
736 if session then
737 session.conn = conn;
738 sessions[conn] = session;
739 initialize_session(session);
740 end
741 end
742
743 -- Complete the sentence "Your certificate " with what's wrong
744 local function friendly_cert_error(session) --> string
745 if session.cert_chain_status == "invalid" then
746 if session.cert_chain_errors then
747 local cert_errors = set.new(session.cert_chain_errors[1]);
748 if cert_errors:contains("certificate has expired") then
749 return "has expired";
750 elseif cert_errors:contains("self signed certificate") then
751 return "is self-signed";
752 end
753 end
754 return "is not trusted"; -- for some other reason
755 elseif session.cert_identity_status == "invalid" then
756 return "is not valid for this name";
757 end
758 -- this should normally be unreachable except if no s2s auth module was loaded
759 return "could not be validated";
760 end
761
762 function check_auth_policy(event)
763 local host, session = event.host, event.session;
764 local must_secure = secure_auth;
765
766 if not must_secure and secure_domains[host] then
767 must_secure = true;
768 elseif must_secure and insecure_domains[host] then
769 must_secure = false;
770 end
771
772 if must_secure and (session.cert_chain_status ~= "valid" or session.cert_identity_status ~= "valid") then
773 local reason = friendly_cert_error(session);
774 session.log("warn", "Forbidding insecure connection to/from %s because its certificate %s", host or session.ip or "(unknown host)", reason);
775 -- XEP-0178 recommends closing outgoing connections without warning
776 -- but does not give a rationale for this.
777 -- In practice most cases are configuration mistakes or forgotten
778 -- certificate renewals. We think it's better to let the other party
779 -- know about the problem so that they can fix it.
780 session:close({ condition = "not-authorized", text = "Your server's certificate "..reason },
781 nil, "Remote server's certificate "..reason);
782 return false;
783 end
784 end
785
786 module:hook("s2s-check-certificate", check_auth_policy, -1);
787
788 module:hook("server-stopping", function(event)
789 local reason = event.reason;
790 for _, session in pairs(sessions) do
791 session:close{ condition = "system-shutdown", text = reason };
792 end
793 end, -200);
794
795
796
797 module:provides("net", {
798 name = "s2s";
799 listener = listener;
800 default_port = 5269;
801 encryption = "starttls";
802 ssl_config = { -- FIXME This is not used atm, see mod_tls
803 verify = { "peer", "client_once", };
804 };
805 multiplex = {
806 protocol = "xmpp-server";
807 pattern = "^<.*:stream.*%sxmlns%s*=%s*(['\"])jabber:server%1.*>";
808 };
809 });
810