Software /
code /
prosody
Comparison
plugins/mod_s2s/mod_s2s.lua @ 10120:756b8821007a
mod_s2s: Use net.connect instead of s2sout.lib for outgoing s2s connections
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Sat, 10 Nov 2018 13:37:32 +0100 |
parent | 10115:c0bd5daa9c7f |
child | 10226:77f900bbbf25 |
comparison
equal
deleted
inserted
replaced
10119:29733134c76c | 10120:756b8821007a |
---|---|
25 local s2s_new_outgoing = require "core.s2smanager".new_outgoing; | 25 local s2s_new_outgoing = require "core.s2smanager".new_outgoing; |
26 local s2s_destroy_session = require "core.s2smanager".destroy_session; | 26 local s2s_destroy_session = require "core.s2smanager".destroy_session; |
27 local uuid_gen = require "util.uuid".generate; | 27 local uuid_gen = require "util.uuid".generate; |
28 local fire_global_event = prosody.events.fire_event; | 28 local fire_global_event = prosody.events.fire_event; |
29 local runner = require "util.async".runner; | 29 local runner = require "util.async".runner; |
30 | 30 local connect = require "net.connect".connect; |
31 local s2sout = module:require("s2sout"); | 31 local service = require "net.resolvers.service"; |
32 | 32 |
33 local connect_timeout = module:get_option_number("s2s_timeout", 90); | 33 local connect_timeout = module:get_option_number("s2s_timeout", 90); |
34 local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5); | 34 local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5); |
35 local opt_keepalives = module:get_option_boolean("s2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true)); | 35 local opt_keepalives = module:get_option_boolean("s2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true)); |
36 local secure_auth = module:get_option_boolean("s2s_secure_auth", false); -- One day... | 36 local secure_auth = module:get_option_boolean("s2s_secure_auth", false); -- One day... |
42 local measure_ipv6 = module:measure("ipv6", "amount"); | 42 local measure_ipv6 = module:measure("ipv6", "amount"); |
43 | 43 |
44 local sessions = module:shared("sessions"); | 44 local sessions = module:shared("sessions"); |
45 | 45 |
46 local runner_callbacks = {}; | 46 local runner_callbacks = {}; |
47 | |
48 local listener = {}; | |
47 | 49 |
48 local log = module._log; | 50 local log = module._log; |
49 | 51 |
50 module:hook("stats-update", function () | 52 module:hook("stats-update", function () |
51 local count = 0; | 53 local count = 0; |
152 -- Create a new outgoing session for a stanza | 154 -- Create a new outgoing session for a stanza |
153 function route_to_new_session(event) | 155 function route_to_new_session(event) |
154 local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza; | 156 local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza; |
155 log("debug", "opening a new outgoing connection for this stanza"); | 157 log("debug", "opening a new outgoing connection for this stanza"); |
156 local host_session = s2s_new_outgoing(from_host, to_host); | 158 local host_session = s2s_new_outgoing(from_host, to_host); |
159 host_session.version = 1; | |
157 | 160 |
158 -- Store in buffer | 161 -- Store in buffer |
159 host_session.bounce_sendq = bounce_sendq; | 162 host_session.bounce_sendq = bounce_sendq; |
160 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; | 163 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; |
161 log("debug", "stanza [%s] queued until connection complete", stanza.name); | 164 log("debug", "stanza [%s] queued until connection complete", stanza.name); |
162 s2sout.initiate_connection(host_session); | 165 connect(service.new(to_host, "xmpp-server", "tcp", { default_port = 5269 }), listener, nil, { session = host_session }); |
163 if (not host_session.connecting) and (not host_session.conn) then | |
164 log("warn", "Connection to %s failed already, destroying session...", to_host); | |
165 s2s_destroy_session(host_session, "Connection failed"); | |
166 return false; | |
167 end | |
168 return true; | 166 return true; |
169 end | 167 end |
170 | 168 |
171 local function keepalive(event) | 169 local function keepalive(event) |
172 return event.session.sends2s(' '); | 170 return event.session.sends2s(' '); |
476 text = condition .. (text and (" ("..text..")") or ""); | 474 text = condition .. (text and (" ("..text..")") or ""); |
477 session.log("info", "Session closed by remote with error: %s", text); | 475 session.log("info", "Session closed by remote with error: %s", text); |
478 session:close(nil, text); | 476 session:close(nil, text); |
479 end | 477 end |
480 end | 478 end |
481 | |
482 local listener = {}; | |
483 | 479 |
484 --- Session methods | 480 --- Session methods |
485 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; | 481 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; |
486 local function session_close(session, reason, remote_reason) | 482 local function session_close(session, reason, remote_reason) |
487 local log = session.log or log; | 483 local log = session.log or log; |
677 | 673 |
678 function listener.ondisconnect(conn, err) | 674 function listener.ondisconnect(conn, err) |
679 local session = sessions[conn]; | 675 local session = sessions[conn]; |
680 if session then | 676 if session then |
681 sessions[conn] = nil; | 677 sessions[conn] = nil; |
678 (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed"); | |
679 s2s_destroy_session(session, err); | |
680 end | |
681 end | |
682 | |
683 function listener.onfail(data, err) | |
684 local session = data and data.session; | |
685 if session then | |
682 if err and session.direction == "outgoing" and session.notopen then | 686 if err and session.direction == "outgoing" and session.notopen then |
683 (session.log or log)("debug", "s2s connection attempt failed: %s", err); | 687 (session.log or log)("debug", "s2s connection attempt failed: %s", err); |
684 if s2sout.attempt_connection(session, err) then | |
685 return; -- Session lives for now | |
686 end | |
687 end | 688 end |
688 (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed"); | 689 (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed"); |
689 s2s_destroy_session(session, err); | 690 s2s_destroy_session(session, err); |
690 end | 691 end |
691 end | 692 end |
703 initialize_session(session); | 704 initialize_session(session); |
704 end | 705 end |
705 | 706 |
706 function listener.ondetach(conn) | 707 function listener.ondetach(conn) |
707 sessions[conn] = nil; | 708 sessions[conn] = nil; |
709 end | |
710 | |
711 function listener.onattach(conn, data) | |
712 local session = data and data.session; | |
713 if session then | |
714 session.conn = conn; | |
715 sessions[conn] = session; | |
716 initialize_session(session); | |
717 end | |
708 end | 718 end |
709 | 719 |
710 function check_auth_policy(event) | 720 function check_auth_policy(event) |
711 local host, session = event.host, event.session; | 721 local host, session = event.host, event.session; |
712 local must_secure = secure_auth; | 722 local must_secure = secure_auth; |
727 return false; | 737 return false; |
728 end | 738 end |
729 end | 739 end |
730 | 740 |
731 module:hook("s2s-check-certificate", check_auth_policy, -1); | 741 module:hook("s2s-check-certificate", check_auth_policy, -1); |
732 | |
733 s2sout.set_listener(listener); | |
734 | 742 |
735 module:hook("server-stopping", function(event) | 743 module:hook("server-stopping", function(event) |
736 local reason = event.reason; | 744 local reason = event.reason; |
737 for _, session in pairs(sessions) do | 745 for _, session in pairs(sessions) do |
738 session:close{ condition = "system-shutdown", text = reason }; | 746 session:close{ condition = "system-shutdown", text = reason }; |