Software /
code /
prosody
Comparison
core/s2smanager.lua @ 403:da92afa267cf
Merging with main branch.
author | Tobias Markmann <tm@ayena.de> |
---|---|
date | Sun, 23 Nov 2008 20:44:48 +0100 |
parent | 360:e918c979ad1a |
child | 434:0d7ba3742f7a |
comparison
equal
deleted
inserted
replaced
402:50f1c09541cd | 403:da92afa267cf |
---|---|
1 | 1 |
2 local hosts = hosts; | 2 local hosts = hosts; |
3 local sessions = sessions; | 3 local sessions = sessions; |
4 local socket = require "socket"; | 4 local socket = require "socket"; |
5 local format = string.format; | 5 local format = string.format; |
6 local t_insert = table.insert; | 6 local t_insert, t_sort = table.insert, table.sort; |
7 local get_traceback = debug.traceback; | 7 local get_traceback = debug.traceback; |
8 local tostring, pairs, ipairs, getmetatable, print, newproxy, error, tonumber | 8 local tostring, pairs, ipairs, getmetatable, print, newproxy, error, tonumber |
9 = tostring, pairs, ipairs, getmetatable, print, newproxy, error, tonumber; | 9 = tostring, pairs, ipairs, getmetatable, print, newproxy, error, tonumber; |
10 | 10 |
11 local connlisteners_get = require "net.connlisteners".get; | 11 local connlisteners_get = require "net.connlisteners".get; |
22 | 22 |
23 local md5_hash = require "util.hashes".md5; | 23 local md5_hash = require "util.hashes".md5; |
24 | 24 |
25 local dialback_secret = "This is very secret!!! Ha!"; | 25 local dialback_secret = "This is very secret!!! Ha!"; |
26 | 26 |
27 local srvmap = { ["gmail.com"] = "talk.google.com", ["identi.ca"] = "hampton.controlezvous.ca", ["cdr.se"] = "jabber.cdr.se" }; | 27 local dns = require "net.dns"; |
28 | 28 |
29 module "s2smanager" | 29 module "s2smanager" |
30 | 30 |
31 local function compare_srv_priorities(a,b) return a.priority < b.priority or a.weight < b.weight; end | |
32 | |
31 function send_to_host(from_host, to_host, data) | 33 function send_to_host(from_host, to_host, data) |
34 if data.name then data = tostring(data); end | |
32 local host = hosts[from_host].s2sout[to_host]; | 35 local host = hosts[from_host].s2sout[to_host]; |
33 if host then | 36 if host then |
34 -- We have a connection to this host already | 37 -- We have a connection to this host already |
35 if host.type == "s2sout_unauthed" then | 38 if host.type == "s2sout_unauthed" and ((not data.xmlns) or data.xmlns == "jabber:client" or data.xmlns == "jabber:server") then |
36 host.log("debug", "trying to send over unauthed s2sout to "..to_host..", authing it now..."); | 39 (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host..", authing it now..."); |
37 if not host.notopen and not host.dialback_key then | 40 if not host.notopen and not host.dialback_key then |
38 host.log("debug", "dialback had not been initiated"); | 41 host.log("debug", "dialback had not been initiated"); |
39 initiate_dialback(host); | 42 initiate_dialback(host); |
40 end | 43 end |
41 | 44 |
49 else | 52 else |
50 (host.log or log)("debug", "going to send stanza to "..to_host.." from "..from_host); | 53 (host.log or log)("debug", "going to send stanza to "..to_host.." from "..from_host); |
51 -- FIXME | 54 -- FIXME |
52 if host.from_host ~= from_host then | 55 if host.from_host ~= from_host then |
53 log("error", "WARNING! This might, possibly, be a bug, but it might not..."); | 56 log("error", "WARNING! This might, possibly, be a bug, but it might not..."); |
54 log("error", "We are going to send from %s instead of %s", host.from_host, from_host); | 57 log("error", "We are going to send from %s instead of %s", tostring(host.from_host), tostring(from_host)); |
55 end | 58 end |
56 host.sends2s(data); | 59 host.sends2s(data); |
57 host.log("debug", "stanza sent over "..host.type); | 60 host.log("debug", "stanza sent over "..host.type); |
58 end | 61 end |
59 else | 62 else |
71 if true then | 74 if true then |
72 session.trace = newproxy(true); | 75 session.trace = newproxy(true); |
73 getmetatable(session.trace).__gc = function () open_sessions = open_sessions - 1; print("s2s session got collected, now "..open_sessions.." s2s sessions are allocated") end; | 76 getmetatable(session.trace).__gc = function () open_sessions = open_sessions - 1; print("s2s session got collected, now "..open_sessions.." s2s sessions are allocated") end; |
74 end | 77 end |
75 open_sessions = open_sessions + 1; | 78 open_sessions = open_sessions + 1; |
76 local w = conn.write; | 79 local w, log = conn.write, logger_init("s2sin"..tostring(conn):match("[a-f0-9]+$")); |
77 session.sends2s = function (t) w(tostring(t)); end | 80 session.sends2s = function (t) log("debug", "sending: %s", tostring(t)); w(tostring(t)); end |
78 return session; | 81 return session; |
79 end | 82 end |
80 | 83 |
81 function new_outgoing(from_host, to_host) | 84 function new_outgoing(from_host, to_host) |
82 local host_session = { to_host = to_host, from_host = from_host, notopen = true, type = "s2sout_unauthed", direction = "outgoing" }; | 85 local host_session = { to_host = to_host, from_host = from_host, notopen = true, type = "s2sout_unauthed", direction = "outgoing" }; |
83 hosts[from_host].s2sout[to_host] = host_session; | 86 hosts[from_host].s2sout[to_host] = host_session; |
84 local cl = connlisteners_get("xmppserver"); | 87 local cl = connlisteners_get("xmppserver"); |
85 | 88 |
86 local conn, handler = socket.tcp() | 89 local conn, handler = socket.tcp() |
87 | 90 |
88 --FIXME: Below parameters (ports/ip) are incorrect (use SRV) | 91 local connect_host, connect_port = to_host, 5269; |
89 to_host = srvmap[to_host] or to_host; | 92 |
93 local answer = dns.lookup("_xmpp-server._tcp."..to_host..".", "SRV"); | |
94 | |
95 if answer then | |
96 log("debug", to_host.." has SRV records, handling..."); | |
97 local srv_hosts = {}; | |
98 host_session.srv_hosts = srv_hosts; | |
99 for _, record in ipairs(answer) do | |
100 t_insert(srv_hosts, record.srv); | |
101 end | |
102 t_sort(srv_hosts, compare_srv_priorities); | |
103 | |
104 local srv_choice = srv_hosts[1]; | |
105 if srv_choice then | |
106 connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port; | |
107 log("debug", "Best record found, will connect to %s:%d", connect_host, connect_port); | |
108 end | |
109 end | |
90 | 110 |
91 conn:settimeout(0); | 111 conn:settimeout(0); |
92 local success, err = conn:connect(to_host, 5269); | 112 local success, err = conn:connect(connect_host, connect_port); |
93 if not success then | 113 if not success and err ~= "timeout" then |
94 log("warn", "s2s connect() failed: %s", err); | 114 log("warn", "s2s connect() failed: %s", err); |
95 end | 115 end |
96 | 116 |
97 conn = wraptlsclient(cl, conn, to_host, 5269, 0, 1, hosts[from_host].ssl_ctx ); | 117 conn = wraptlsclient(cl, conn, connect_host, connect_port, 0, 1, hosts[from_host].ssl_ctx ); |
98 host_session.conn = conn; | 118 host_session.conn = conn; |
99 | 119 |
100 -- Register this outgoing connection so that xmppserver_listener knows about it | 120 -- Register this outgoing connection so that xmppserver_listener knows about it |
101 -- otherwise it will assume it is a new incoming connection | 121 -- otherwise it will assume it is a new incoming connection |
102 cl.register_outgoing(conn, host_session); | 122 cl.register_outgoing(conn, host_session); |
103 | 123 |
124 local log; | |
104 do | 125 do |
105 local conn_name = "s2sout"..tostring(conn):match("[a-f0-9]*$"); | 126 local conn_name = "s2sout"..tostring(conn):match("[a-f0-9]*$"); |
106 host_session.log = logger_init(conn_name); | 127 log = logger_init(conn_name); |
128 host_session.log = log; | |
107 end | 129 end |
108 | 130 |
109 local w = conn.write; | 131 local w = conn.write; |
110 host_session.sends2s = function (t) w(tostring(t)); end | 132 host_session.sends2s = function (t) log("debug", "sending: %s", tostring(t)); w(tostring(t)); end |
111 | 133 |
112 conn.write(format([[<stream:stream xmlns='jabber:server' xmlns:db='jabber:server:dialback' xmlns:stream='http://etherx.jabber.org/streams' from='%s' to='%s' version='1.0'>]], from_host, to_host)); | 134 conn.write(format([[<stream:stream xmlns='jabber:server' xmlns:db='jabber:server:dialback' xmlns:stream='http://etherx.jabber.org/streams' from='%s' to='%s' version='1.0'>]], from_host, to_host)); |
113 | 135 |
114 return host_session; | 136 return host_session; |
115 end | 137 end |
117 function streamopened(session, attr) | 139 function streamopened(session, attr) |
118 local send = session.sends2s; | 140 local send = session.sends2s; |
119 | 141 |
120 session.version = tonumber(attr.version) or 0; | 142 session.version = tonumber(attr.version) or 0; |
121 if session.version >= 1.0 and not (attr.to and attr.from) then | 143 if session.version >= 1.0 and not (attr.to and attr.from) then |
122 print("to: "..tostring(attr.to).." from: "..tostring(attr.from)); | 144 --print("to: "..tostring(attr.to).." from: "..tostring(attr.from)); |
123 --error(session.to_host.." failed to specify 'to' or 'from' hostname as per RFC"); | |
124 log("warn", (session.to_host or "(unknown)").." failed to specify 'to' or 'from' hostname as per RFC"); | 145 log("warn", (session.to_host or "(unknown)").." failed to specify 'to' or 'from' hostname as per RFC"); |
125 end | 146 end |
126 | 147 |
127 if session.direction == "incoming" then | 148 if session.direction == "incoming" then |
128 -- Send a reply stream header | 149 -- Send a reply stream header |
129 | 150 |
130 for k,v in pairs(attr) do print("", tostring(k), ":::", tostring(v)); end | 151 --for k,v in pairs(attr) do print("", tostring(k), ":::", tostring(v)); end |
131 | 152 |
132 session.to_host = attr.to; | 153 session.to_host = attr.to; |
133 session.from_host = attr.from; | 154 session.from_host = attr.from; |
134 | 155 |
135 session.streamid = uuid_gen(); | 156 session.streamid = uuid_gen(); |
136 print(session, session.from_host, "incoming s2s stream opened"); | 157 (session.log or log)("debug", "incoming s2s received <stream:stream>"); |
137 send("<?xml version='1.0'?>"); | 158 send("<?xml version='1.0'?>"); |
138 send(stanza("stream:stream", { xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback', ["xmlns:stream"]='http://etherx.jabber.org/streams', id=session.streamid, from=session.to_host }):top_tag()); | 159 send(stanza("stream:stream", { xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback', ["xmlns:stream"]='http://etherx.jabber.org/streams', id=session.streamid, from=session.to_host }):top_tag()); |
160 if session.to_host and not hosts[session.to_host] then | |
161 -- Attempting to connect to a host we don't serve | |
162 session:close("host-unknown"); | |
163 return; | |
164 end | |
165 if session.version >= 1.0 then | |
166 send(st.stanza("stream:features") | |
167 :tag("dialback", { xmlns='urn:xmpp:features:dialback' }):tag("optional"):up():up()); | |
168 end | |
139 elseif session.direction == "outgoing" then | 169 elseif session.direction == "outgoing" then |
140 -- If we are just using the connection for verifying dialback keys, we won't try and auth it | 170 -- If we are just using the connection for verifying dialback keys, we won't try and auth it |
141 if not attr.id then error("stream response did not give us a streamid!!!"); end | 171 if not attr.id then error("stream response did not give us a streamid!!!"); end |
142 session.streamid = attr.id; | 172 session.streamid = attr.id; |
143 | 173 |
145 initiate_dialback(session); | 175 initiate_dialback(session); |
146 else | 176 else |
147 mark_connected(session); | 177 mark_connected(session); |
148 end | 178 end |
149 end | 179 end |
150 --[[ | |
151 local features = {}; | |
152 modulemanager.fire_event("stream-features-s2s", session, features); | |
153 | |
154 send("<stream:features>"); | |
155 | |
156 for _, feature in ipairs(features) do | |
157 send(tostring(feature)); | |
158 end | |
159 | |
160 send("</stream:features>");]] | |
161 | 180 |
162 session.notopen = nil; | 181 session.notopen = nil; |
163 end | 182 end |
164 | 183 |
165 function initiate_dialback(session) | 184 function initiate_dialback(session) |
215 end | 234 end |
216 end | 235 end |
217 | 236 |
218 function destroy_session(session) | 237 function destroy_session(session) |
219 (session.log or log)("info", "Destroying "..tostring(session.direction).." session "..tostring(session.from_host).."->"..tostring(session.to_host)); | 238 (session.log or log)("info", "Destroying "..tostring(session.direction).." session "..tostring(session.from_host).."->"..tostring(session.to_host)); |
239 | |
240 -- FIXME: Flush sendq here/report errors to originators | |
241 | |
220 if session.direction == "outgoing" then | 242 if session.direction == "outgoing" then |
221 hosts[session.from_host].s2sout[session.to_host] = nil; | 243 hosts[session.from_host].s2sout[session.to_host] = nil; |
222 end | 244 end |
223 session.conn = nil; | 245 |
224 session.disconnect = nil; | |
225 for k in pairs(session) do | 246 for k in pairs(session) do |
226 if k ~= "trace" then | 247 if k ~= "trace" then |
227 session[k] = nil; | 248 session[k] = nil; |
228 end | 249 end |
229 end | 250 end |