Software / code / prosody
Comparison
plugins/mod_s2s/mod_s2s.lua @ 10120:756b8821007a
mod_s2s: Use net.connect instead of s2sout.lib for outgoing s2s connections
| author | Kim Alvefur <zash@zash.se> |
|---|---|
| date | Sat, 10 Nov 2018 13:37:32 +0100 |
| parent | 10115:c0bd5daa9c7f |
| child | 10226:77f900bbbf25 |
comparison
equal
deleted
inserted
replaced
| 10119:29733134c76c | 10120:756b8821007a |
|---|---|
| 25 local s2s_new_outgoing = require "core.s2smanager".new_outgoing; | 25 local s2s_new_outgoing = require "core.s2smanager".new_outgoing; |
| 26 local s2s_destroy_session = require "core.s2smanager".destroy_session; | 26 local s2s_destroy_session = require "core.s2smanager".destroy_session; |
| 27 local uuid_gen = require "util.uuid".generate; | 27 local uuid_gen = require "util.uuid".generate; |
| 28 local fire_global_event = prosody.events.fire_event; | 28 local fire_global_event = prosody.events.fire_event; |
| 29 local runner = require "util.async".runner; | 29 local runner = require "util.async".runner; |
| 30 | 30 local connect = require "net.connect".connect; |
| 31 local s2sout = module:require("s2sout"); | 31 local service = require "net.resolvers.service"; |
| 32 | 32 |
| 33 local connect_timeout = module:get_option_number("s2s_timeout", 90); | 33 local connect_timeout = module:get_option_number("s2s_timeout", 90); |
| 34 local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5); | 34 local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5); |
| 35 local opt_keepalives = module:get_option_boolean("s2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true)); | 35 local opt_keepalives = module:get_option_boolean("s2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true)); |
| 36 local secure_auth = module:get_option_boolean("s2s_secure_auth", false); -- One day... | 36 local secure_auth = module:get_option_boolean("s2s_secure_auth", false); -- One day... |
| 42 local measure_ipv6 = module:measure("ipv6", "amount"); | 42 local measure_ipv6 = module:measure("ipv6", "amount"); |
| 43 | 43 |
| 44 local sessions = module:shared("sessions"); | 44 local sessions = module:shared("sessions"); |
| 45 | 45 |
| 46 local runner_callbacks = {}; | 46 local runner_callbacks = {}; |
| 47 | |
| 48 local listener = {}; | |
| 47 | 49 |
| 48 local log = module._log; | 50 local log = module._log; |
| 49 | 51 |
| 50 module:hook("stats-update", function () | 52 module:hook("stats-update", function () |
| 51 local count = 0; | 53 local count = 0; |
| 152 -- Create a new outgoing session for a stanza | 154 -- Create a new outgoing session for a stanza |
| 153 function route_to_new_session(event) | 155 function route_to_new_session(event) |
| 154 local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza; | 156 local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza; |
| 155 log("debug", "opening a new outgoing connection for this stanza"); | 157 log("debug", "opening a new outgoing connection for this stanza"); |
| 156 local host_session = s2s_new_outgoing(from_host, to_host); | 158 local host_session = s2s_new_outgoing(from_host, to_host); |
| 159 host_session.version = 1; | |
| 157 | 160 |
| 158 -- Store in buffer | 161 -- Store in buffer |
| 159 host_session.bounce_sendq = bounce_sendq; | 162 host_session.bounce_sendq = bounce_sendq; |
| 160 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; | 163 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; |
| 161 log("debug", "stanza [%s] queued until connection complete", stanza.name); | 164 log("debug", "stanza [%s] queued until connection complete", stanza.name); |
| 162 s2sout.initiate_connection(host_session); | 165 connect(service.new(to_host, "xmpp-server", "tcp", { default_port = 5269 }), listener, nil, { session = host_session }); |
| 163 if (not host_session.connecting) and (not host_session.conn) then | |
| 164 log("warn", "Connection to %s failed already, destroying session...", to_host); | |
| 165 s2s_destroy_session(host_session, "Connection failed"); | |
| 166 return false; | |
| 167 end | |
| 168 return true; | 166 return true; |
| 169 end | 167 end |
| 170 | 168 |
| 171 local function keepalive(event) | 169 local function keepalive(event) |
| 172 return event.session.sends2s(' '); | 170 return event.session.sends2s(' '); |
| 476 text = condition .. (text and (" ("..text..")") or ""); | 474 text = condition .. (text and (" ("..text..")") or ""); |
| 477 session.log("info", "Session closed by remote with error: %s", text); | 475 session.log("info", "Session closed by remote with error: %s", text); |
| 478 session:close(nil, text); | 476 session:close(nil, text); |
| 479 end | 477 end |
| 480 end | 478 end |
| 481 | |
| 482 local listener = {}; | |
| 483 | 479 |
| 484 --- Session methods | 480 --- Session methods |
| 485 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; | 481 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; |
| 486 local function session_close(session, reason, remote_reason) | 482 local function session_close(session, reason, remote_reason) |
| 487 local log = session.log or log; | 483 local log = session.log or log; |
| 677 | 673 |
| 678 function listener.ondisconnect(conn, err) | 674 function listener.ondisconnect(conn, err) |
| 679 local session = sessions[conn]; | 675 local session = sessions[conn]; |
| 680 if session then | 676 if session then |
| 681 sessions[conn] = nil; | 677 sessions[conn] = nil; |
| 678 (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed"); | |
| 679 s2s_destroy_session(session, err); | |
| 680 end | |
| 681 end | |
| 682 | |
| 683 function listener.onfail(data, err) | |
| 684 local session = data and data.session; | |
| 685 if session then | |
| 682 if err and session.direction == "outgoing" and session.notopen then | 686 if err and session.direction == "outgoing" and session.notopen then |
| 683 (session.log or log)("debug", "s2s connection attempt failed: %s", err); | 687 (session.log or log)("debug", "s2s connection attempt failed: %s", err); |
| 684 if s2sout.attempt_connection(session, err) then | |
| 685 return; -- Session lives for now | |
| 686 end | |
| 687 end | 688 end |
| 688 (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed"); | 689 (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed"); |
| 689 s2s_destroy_session(session, err); | 690 s2s_destroy_session(session, err); |
| 690 end | 691 end |
| 691 end | 692 end |
| 703 initialize_session(session); | 704 initialize_session(session); |
| 704 end | 705 end |
| 705 | 706 |
| 706 function listener.ondetach(conn) | 707 function listener.ondetach(conn) |
| 707 sessions[conn] = nil; | 708 sessions[conn] = nil; |
| 709 end | |
| 710 | |
| 711 function listener.onattach(conn, data) | |
| 712 local session = data and data.session; | |
| 713 if session then | |
| 714 session.conn = conn; | |
| 715 sessions[conn] = session; | |
| 716 initialize_session(session); | |
| 717 end | |
| 708 end | 718 end |
| 709 | 719 |
| 710 function check_auth_policy(event) | 720 function check_auth_policy(event) |
| 711 local host, session = event.host, event.session; | 721 local host, session = event.host, event.session; |
| 712 local must_secure = secure_auth; | 722 local must_secure = secure_auth; |
| 727 return false; | 737 return false; |
| 728 end | 738 end |
| 729 end | 739 end |
| 730 | 740 |
| 731 module:hook("s2s-check-certificate", check_auth_policy, -1); | 741 module:hook("s2s-check-certificate", check_auth_policy, -1); |
| 732 | |
| 733 s2sout.set_listener(listener); | |
| 734 | 742 |
| 735 module:hook("server-stopping", function(event) | 743 module:hook("server-stopping", function(event) |
| 736 local reason = event.reason; | 744 local reason = event.reason; |
| 737 for _, session in pairs(sessions) do | 745 for _, session in pairs(sessions) do |
| 738 session:close{ condition = "system-shutdown", text = reason }; | 746 session:close{ condition = "system-shutdown", text = reason }; |