Comparison

plugins/mod_s2s/mod_s2s.lua @ 10120:756b8821007a

mod_s2s: Use net.connect instead of s2sout.lib for outgoing s2s connections
author Kim Alvefur <zash@zash.se>
date Sat, 10 Nov 2018 13:37:32 +0100
parent 10115:c0bd5daa9c7f
child 10226:77f900bbbf25
comparison
equal deleted inserted replaced
10119:29733134c76c 10120:756b8821007a
25 local s2s_new_outgoing = require "core.s2smanager".new_outgoing; 25 local s2s_new_outgoing = require "core.s2smanager".new_outgoing;
26 local s2s_destroy_session = require "core.s2smanager".destroy_session; 26 local s2s_destroy_session = require "core.s2smanager".destroy_session;
27 local uuid_gen = require "util.uuid".generate; 27 local uuid_gen = require "util.uuid".generate;
28 local fire_global_event = prosody.events.fire_event; 28 local fire_global_event = prosody.events.fire_event;
29 local runner = require "util.async".runner; 29 local runner = require "util.async".runner;
30 30 local connect = require "net.connect".connect;
31 local s2sout = module:require("s2sout"); 31 local service = require "net.resolvers.service";
32 32
33 local connect_timeout = module:get_option_number("s2s_timeout", 90); 33 local connect_timeout = module:get_option_number("s2s_timeout", 90);
34 local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5); 34 local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5);
35 local opt_keepalives = module:get_option_boolean("s2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true)); 35 local opt_keepalives = module:get_option_boolean("s2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true));
36 local secure_auth = module:get_option_boolean("s2s_secure_auth", false); -- One day... 36 local secure_auth = module:get_option_boolean("s2s_secure_auth", false); -- One day...
42 local measure_ipv6 = module:measure("ipv6", "amount"); 42 local measure_ipv6 = module:measure("ipv6", "amount");
43 43
44 local sessions = module:shared("sessions"); 44 local sessions = module:shared("sessions");
45 45
46 local runner_callbacks = {}; 46 local runner_callbacks = {};
47
48 local listener = {};
47 49
48 local log = module._log; 50 local log = module._log;
49 51
50 module:hook("stats-update", function () 52 module:hook("stats-update", function ()
51 local count = 0; 53 local count = 0;
152 -- Create a new outgoing session for a stanza 154 -- Create a new outgoing session for a stanza
153 function route_to_new_session(event) 155 function route_to_new_session(event)
154 local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza; 156 local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza;
155 log("debug", "opening a new outgoing connection for this stanza"); 157 log("debug", "opening a new outgoing connection for this stanza");
156 local host_session = s2s_new_outgoing(from_host, to_host); 158 local host_session = s2s_new_outgoing(from_host, to_host);
159 host_session.version = 1;
157 160
158 -- Store in buffer 161 -- Store in buffer
159 host_session.bounce_sendq = bounce_sendq; 162 host_session.bounce_sendq = bounce_sendq;
160 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; 163 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} };
161 log("debug", "stanza [%s] queued until connection complete", stanza.name); 164 log("debug", "stanza [%s] queued until connection complete", stanza.name);
162 s2sout.initiate_connection(host_session); 165 connect(service.new(to_host, "xmpp-server", "tcp", { default_port = 5269 }), listener, nil, { session = host_session });
163 if (not host_session.connecting) and (not host_session.conn) then
164 log("warn", "Connection to %s failed already, destroying session...", to_host);
165 s2s_destroy_session(host_session, "Connection failed");
166 return false;
167 end
168 return true; 166 return true;
169 end 167 end
170 168
171 local function keepalive(event) 169 local function keepalive(event)
172 return event.session.sends2s(' '); 170 return event.session.sends2s(' ');
476 text = condition .. (text and (" ("..text..")") or ""); 474 text = condition .. (text and (" ("..text..")") or "");
477 session.log("info", "Session closed by remote with error: %s", text); 475 session.log("info", "Session closed by remote with error: %s", text);
478 session:close(nil, text); 476 session:close(nil, text);
479 end 477 end
480 end 478 end
481
482 local listener = {};
483 479
484 --- Session methods 480 --- Session methods
485 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; 481 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
486 local function session_close(session, reason, remote_reason) 482 local function session_close(session, reason, remote_reason)
487 local log = session.log or log; 483 local log = session.log or log;
677 673
678 function listener.ondisconnect(conn, err) 674 function listener.ondisconnect(conn, err)
679 local session = sessions[conn]; 675 local session = sessions[conn];
680 if session then 676 if session then
681 sessions[conn] = nil; 677 sessions[conn] = nil;
678 (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed");
679 s2s_destroy_session(session, err);
680 end
681 end
682
683 function listener.onfail(data, err)
684 local session = data and data.session;
685 if session then
682 if err and session.direction == "outgoing" and session.notopen then 686 if err and session.direction == "outgoing" and session.notopen then
683 (session.log or log)("debug", "s2s connection attempt failed: %s", err); 687 (session.log or log)("debug", "s2s connection attempt failed: %s", err);
684 if s2sout.attempt_connection(session, err) then
685 return; -- Session lives for now
686 end
687 end 688 end
688 (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed"); 689 (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed");
689 s2s_destroy_session(session, err); 690 s2s_destroy_session(session, err);
690 end 691 end
691 end 692 end
703 initialize_session(session); 704 initialize_session(session);
704 end 705 end
705 706
706 function listener.ondetach(conn) 707 function listener.ondetach(conn)
707 sessions[conn] = nil; 708 sessions[conn] = nil;
709 end
710
711 function listener.onattach(conn, data)
712 local session = data and data.session;
713 if session then
714 session.conn = conn;
715 sessions[conn] = session;
716 initialize_session(session);
717 end
708 end 718 end
709 719
710 function check_auth_policy(event) 720 function check_auth_policy(event)
711 local host, session = event.host, event.session; 721 local host, session = event.host, event.session;
712 local must_secure = secure_auth; 722 local must_secure = secure_auth;
727 return false; 737 return false;
728 end 738 end
729 end 739 end
730 740
731 module:hook("s2s-check-certificate", check_auth_policy, -1); 741 module:hook("s2s-check-certificate", check_auth_policy, -1);
732
733 s2sout.set_listener(listener);
734 742
735 module:hook("server-stopping", function(event) 743 module:hook("server-stopping", function(event)
736 local reason = event.reason; 744 local reason = event.reason;
737 for _, session in pairs(sessions) do 745 for _, session in pairs(sessions) do
738 session:close{ condition = "system-shutdown", text = reason }; 746 session:close{ condition = "system-shutdown", text = reason };