Comparison

core/s2smanager.lua @ 403:da92afa267cf

Merging with main branch.
author Tobias Markmann <tm@ayena.de>
date Sun, 23 Nov 2008 20:44:48 +0100
parent 360:e918c979ad1a
child 434:0d7ba3742f7a
comparison
equal deleted inserted replaced
402:50f1c09541cd 403:da92afa267cf
1 1
2 local hosts = hosts; 2 local hosts = hosts;
3 local sessions = sessions; 3 local sessions = sessions;
4 local socket = require "socket"; 4 local socket = require "socket";
5 local format = string.format; 5 local format = string.format;
6 local t_insert = table.insert; 6 local t_insert, t_sort = table.insert, table.sort;
7 local get_traceback = debug.traceback; 7 local get_traceback = debug.traceback;
8 local tostring, pairs, ipairs, getmetatable, print, newproxy, error, tonumber 8 local tostring, pairs, ipairs, getmetatable, print, newproxy, error, tonumber
9 = tostring, pairs, ipairs, getmetatable, print, newproxy, error, tonumber; 9 = tostring, pairs, ipairs, getmetatable, print, newproxy, error, tonumber;
10 10
11 local connlisteners_get = require "net.connlisteners".get; 11 local connlisteners_get = require "net.connlisteners".get;
22 22
23 local md5_hash = require "util.hashes".md5; 23 local md5_hash = require "util.hashes".md5;
24 24
25 local dialback_secret = "This is very secret!!! Ha!"; 25 local dialback_secret = "This is very secret!!! Ha!";
26 26
27 local srvmap = { ["gmail.com"] = "talk.google.com", ["identi.ca"] = "hampton.controlezvous.ca", ["cdr.se"] = "jabber.cdr.se" }; 27 local dns = require "net.dns";
28 28
29 module "s2smanager" 29 module "s2smanager"
30 30
31 local function compare_srv_priorities(a,b) return a.priority < b.priority or a.weight < b.weight; end
32
31 function send_to_host(from_host, to_host, data) 33 function send_to_host(from_host, to_host, data)
34 if data.name then data = tostring(data); end
32 local host = hosts[from_host].s2sout[to_host]; 35 local host = hosts[from_host].s2sout[to_host];
33 if host then 36 if host then
34 -- We have a connection to this host already 37 -- We have a connection to this host already
35 if host.type == "s2sout_unauthed" then 38 if host.type == "s2sout_unauthed" and ((not data.xmlns) or data.xmlns == "jabber:client" or data.xmlns == "jabber:server") then
36 host.log("debug", "trying to send over unauthed s2sout to "..to_host..", authing it now..."); 39 (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host..", authing it now...");
37 if not host.notopen and not host.dialback_key then 40 if not host.notopen and not host.dialback_key then
38 host.log("debug", "dialback had not been initiated"); 41 host.log("debug", "dialback had not been initiated");
39 initiate_dialback(host); 42 initiate_dialback(host);
40 end 43 end
41 44
49 else 52 else
50 (host.log or log)("debug", "going to send stanza to "..to_host.." from "..from_host); 53 (host.log or log)("debug", "going to send stanza to "..to_host.." from "..from_host);
51 -- FIXME 54 -- FIXME
52 if host.from_host ~= from_host then 55 if host.from_host ~= from_host then
53 log("error", "WARNING! This might, possibly, be a bug, but it might not..."); 56 log("error", "WARNING! This might, possibly, be a bug, but it might not...");
54 log("error", "We are going to send from %s instead of %s", host.from_host, from_host); 57 log("error", "We are going to send from %s instead of %s", tostring(host.from_host), tostring(from_host));
55 end 58 end
56 host.sends2s(data); 59 host.sends2s(data);
57 host.log("debug", "stanza sent over "..host.type); 60 host.log("debug", "stanza sent over "..host.type);
58 end 61 end
59 else 62 else
71 if true then 74 if true then
72 session.trace = newproxy(true); 75 session.trace = newproxy(true);
73 getmetatable(session.trace).__gc = function () open_sessions = open_sessions - 1; print("s2s session got collected, now "..open_sessions.." s2s sessions are allocated") end; 76 getmetatable(session.trace).__gc = function () open_sessions = open_sessions - 1; print("s2s session got collected, now "..open_sessions.." s2s sessions are allocated") end;
74 end 77 end
75 open_sessions = open_sessions + 1; 78 open_sessions = open_sessions + 1;
76 local w = conn.write; 79 local w, log = conn.write, logger_init("s2sin"..tostring(conn):match("[a-f0-9]+$"));
77 session.sends2s = function (t) w(tostring(t)); end 80 session.sends2s = function (t) log("debug", "sending: %s", tostring(t)); w(tostring(t)); end
78 return session; 81 return session;
79 end 82 end
80 83
81 function new_outgoing(from_host, to_host) 84 function new_outgoing(from_host, to_host)
82 local host_session = { to_host = to_host, from_host = from_host, notopen = true, type = "s2sout_unauthed", direction = "outgoing" }; 85 local host_session = { to_host = to_host, from_host = from_host, notopen = true, type = "s2sout_unauthed", direction = "outgoing" };
83 hosts[from_host].s2sout[to_host] = host_session; 86 hosts[from_host].s2sout[to_host] = host_session;
84 local cl = connlisteners_get("xmppserver"); 87 local cl = connlisteners_get("xmppserver");
85 88
86 local conn, handler = socket.tcp() 89 local conn, handler = socket.tcp()
87 90
88 --FIXME: Below parameters (ports/ip) are incorrect (use SRV) 91 local connect_host, connect_port = to_host, 5269;
89 to_host = srvmap[to_host] or to_host; 92
93 local answer = dns.lookup("_xmpp-server._tcp."..to_host..".", "SRV");
94
95 if answer then
96 log("debug", to_host.." has SRV records, handling...");
97 local srv_hosts = {};
98 host_session.srv_hosts = srv_hosts;
99 for _, record in ipairs(answer) do
100 t_insert(srv_hosts, record.srv);
101 end
102 t_sort(srv_hosts, compare_srv_priorities);
103
104 local srv_choice = srv_hosts[1];
105 if srv_choice then
106 connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port;
107 log("debug", "Best record found, will connect to %s:%d", connect_host, connect_port);
108 end
109 end
90 110
91 conn:settimeout(0); 111 conn:settimeout(0);
92 local success, err = conn:connect(to_host, 5269); 112 local success, err = conn:connect(connect_host, connect_port);
93 if not success then 113 if not success and err ~= "timeout" then
94 log("warn", "s2s connect() failed: %s", err); 114 log("warn", "s2s connect() failed: %s", err);
95 end 115 end
96 116
97 conn = wraptlsclient(cl, conn, to_host, 5269, 0, 1, hosts[from_host].ssl_ctx ); 117 conn = wraptlsclient(cl, conn, connect_host, connect_port, 0, 1, hosts[from_host].ssl_ctx );
98 host_session.conn = conn; 118 host_session.conn = conn;
99 119
100 -- Register this outgoing connection so that xmppserver_listener knows about it 120 -- Register this outgoing connection so that xmppserver_listener knows about it
101 -- otherwise it will assume it is a new incoming connection 121 -- otherwise it will assume it is a new incoming connection
102 cl.register_outgoing(conn, host_session); 122 cl.register_outgoing(conn, host_session);
103 123
124 local log;
104 do 125 do
105 local conn_name = "s2sout"..tostring(conn):match("[a-f0-9]*$"); 126 local conn_name = "s2sout"..tostring(conn):match("[a-f0-9]*$");
106 host_session.log = logger_init(conn_name); 127 log = logger_init(conn_name);
128 host_session.log = log;
107 end 129 end
108 130
109 local w = conn.write; 131 local w = conn.write;
110 host_session.sends2s = function (t) w(tostring(t)); end 132 host_session.sends2s = function (t) log("debug", "sending: %s", tostring(t)); w(tostring(t)); end
111 133
112 conn.write(format([[<stream:stream xmlns='jabber:server' xmlns:db='jabber:server:dialback' xmlns:stream='http://etherx.jabber.org/streams' from='%s' to='%s' version='1.0'>]], from_host, to_host)); 134 conn.write(format([[<stream:stream xmlns='jabber:server' xmlns:db='jabber:server:dialback' xmlns:stream='http://etherx.jabber.org/streams' from='%s' to='%s' version='1.0'>]], from_host, to_host));
113 135
114 return host_session; 136 return host_session;
115 end 137 end
117 function streamopened(session, attr) 139 function streamopened(session, attr)
118 local send = session.sends2s; 140 local send = session.sends2s;
119 141
120 session.version = tonumber(attr.version) or 0; 142 session.version = tonumber(attr.version) or 0;
121 if session.version >= 1.0 and not (attr.to and attr.from) then 143 if session.version >= 1.0 and not (attr.to and attr.from) then
122 print("to: "..tostring(attr.to).." from: "..tostring(attr.from)); 144 --print("to: "..tostring(attr.to).." from: "..tostring(attr.from));
123 --error(session.to_host.." failed to specify 'to' or 'from' hostname as per RFC");
124 log("warn", (session.to_host or "(unknown)").." failed to specify 'to' or 'from' hostname as per RFC"); 145 log("warn", (session.to_host or "(unknown)").." failed to specify 'to' or 'from' hostname as per RFC");
125 end 146 end
126 147
127 if session.direction == "incoming" then 148 if session.direction == "incoming" then
128 -- Send a reply stream header 149 -- Send a reply stream header
129 150
130 for k,v in pairs(attr) do print("", tostring(k), ":::", tostring(v)); end 151 --for k,v in pairs(attr) do print("", tostring(k), ":::", tostring(v)); end
131 152
132 session.to_host = attr.to; 153 session.to_host = attr.to;
133 session.from_host = attr.from; 154 session.from_host = attr.from;
134 155
135 session.streamid = uuid_gen(); 156 session.streamid = uuid_gen();
136 print(session, session.from_host, "incoming s2s stream opened"); 157 (session.log or log)("debug", "incoming s2s received <stream:stream>");
137 send("<?xml version='1.0'?>"); 158 send("<?xml version='1.0'?>");
138 send(stanza("stream:stream", { xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback', ["xmlns:stream"]='http://etherx.jabber.org/streams', id=session.streamid, from=session.to_host }):top_tag()); 159 send(stanza("stream:stream", { xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback', ["xmlns:stream"]='http://etherx.jabber.org/streams', id=session.streamid, from=session.to_host }):top_tag());
160 if session.to_host and not hosts[session.to_host] then
161 -- Attempting to connect to a host we don't serve
162 session:close("host-unknown");
163 return;
164 end
165 if session.version >= 1.0 then
166 send(st.stanza("stream:features")
167 :tag("dialback", { xmlns='urn:xmpp:features:dialback' }):tag("optional"):up():up());
168 end
139 elseif session.direction == "outgoing" then 169 elseif session.direction == "outgoing" then
140 -- If we are just using the connection for verifying dialback keys, we won't try and auth it 170 -- If we are just using the connection for verifying dialback keys, we won't try and auth it
141 if not attr.id then error("stream response did not give us a streamid!!!"); end 171 if not attr.id then error("stream response did not give us a streamid!!!"); end
142 session.streamid = attr.id; 172 session.streamid = attr.id;
143 173
145 initiate_dialback(session); 175 initiate_dialback(session);
146 else 176 else
147 mark_connected(session); 177 mark_connected(session);
148 end 178 end
149 end 179 end
150 --[[
151 local features = {};
152 modulemanager.fire_event("stream-features-s2s", session, features);
153
154 send("<stream:features>");
155
156 for _, feature in ipairs(features) do
157 send(tostring(feature));
158 end
159
160 send("</stream:features>");]]
161 180
162 session.notopen = nil; 181 session.notopen = nil;
163 end 182 end
164 183
165 function initiate_dialback(session) 184 function initiate_dialback(session)
215 end 234 end
216 end 235 end
217 236
218 function destroy_session(session) 237 function destroy_session(session)
219 (session.log or log)("info", "Destroying "..tostring(session.direction).." session "..tostring(session.from_host).."->"..tostring(session.to_host)); 238 (session.log or log)("info", "Destroying "..tostring(session.direction).." session "..tostring(session.from_host).."->"..tostring(session.to_host));
239
240 -- FIXME: Flush sendq here/report errors to originators
241
220 if session.direction == "outgoing" then 242 if session.direction == "outgoing" then
221 hosts[session.from_host].s2sout[session.to_host] = nil; 243 hosts[session.from_host].s2sout[session.to_host] = nil;
222 end 244 end
223 session.conn = nil; 245
224 session.disconnect = nil;
225 for k in pairs(session) do 246 for k in pairs(session) do
226 if k ~= "trace" then 247 if k ~= "trace" then
227 session[k] = nil; 248 session[k] = nil;
228 end 249 end
229 end 250 end