Software /
code /
prosody
Comparison
plugins/s2s/s2sout.lib.lua @ 4555:3dce04129693
s2smanager, mod_s2s, mod_s2s/s2sout: Split connection handling out of s2smanager into mod_s2s, and further split connection logic for s2sout to a module lib, s2sout.lib.lua
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Mon, 23 Jan 2012 16:28:20 +0000 |
child | 4562:b163c65379a4 |
child | 4569:34b1122126f6 |
comparison
equal
deleted
inserted
replaced
4554:6ceabde7af91 | 4555:3dce04129693 |
---|---|
1 -- Prosody IM | |
2 -- Copyright (C) 2008-2010 Matthew Wild | |
3 -- Copyright (C) 2008-2010 Waqas Hussain | |
4 -- | |
5 -- This project is MIT/X11 licensed. Please see the | |
6 -- COPYING file in the source package for more information. | |
7 -- | |
8 | |
9 --- Module containing all the logic for connecting to a remote server | |
10 | |
11 local initialize_filters = require "util.filters".initialize; | |
12 local idna_to_ascii = require "util.encodings".idna.to_ascii; | |
13 local add_task = require "util.timer".add_task; | |
14 local socket = require "socket"; | |
15 | |
16 local s2s_destroy_session = require "core.s2smanager".destroy_session; | |
17 | |
18 local s2sout = {}; | |
19 | |
20 local s2s_listener; | |
21 | |
22 function s2sout.set_listener(listener) | |
23 s2s_listener = listener; | |
24 end | |
25 | |
26 local function compare_srv_priorities(a,b) | |
27 return a.priority < b.priority or (a.priority == b.priority and a.weight > b.weight); | |
28 end | |
29 | |
30 local function session_open_stream(session, from, to) | |
31 session.sends2s(st.stanza("stream:stream", { | |
32 xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback', | |
33 ["xmlns:stream"]='http://etherx.jabber.org/streams', | |
34 from=from, to=to, version='1.0', ["xml:lang"]='en'}):top_tag()); | |
35 end | |
36 | |
37 function s2sout.initiate_connection(host_session) | |
38 initialize_filters(host_session); | |
39 session.open_stream = session_open_stream; | |
40 | |
41 -- Kick the connection attempting machine into life | |
42 if not s2sout.attempt_connection(host_session) then | |
43 -- Intentionally not returning here, the | |
44 -- session is needed, connected or not | |
45 s2s_destroy_session(host_session); | |
46 end | |
47 | |
48 if not host_session.sends2s then | |
49 -- A sends2s which buffers data (until the stream is opened) | |
50 -- note that data in this buffer will be sent before the stream is authed | |
51 -- and will not be ack'd in any way, successful or otherwise | |
52 local buffer; | |
53 function host_session.sends2s(data) | |
54 if not buffer then | |
55 buffer = {}; | |
56 host_session.send_buffer = buffer; | |
57 end | |
58 log("debug", "Buffering data on unconnected s2sout to %s", to_host); | |
59 buffer[#buffer+1] = data; | |
60 log("debug", "Buffered item %d: %s", #buffer, tostring(data)); | |
61 end | |
62 end | |
63 end | |
64 | |
65 function s2sout.attempt_connection(host_session, err) | |
66 local from_host, to_host = host_session.from_host, host_session.to_host; | |
67 local connect_host, connect_port = to_host and idna_to_ascii(to_host), 5269; | |
68 | |
69 if not connect_host then | |
70 return false; | |
71 end | |
72 | |
73 if not err then -- This is our first attempt | |
74 log("debug", "First attempt to connect to %s, starting with SRV lookup...", to_host); | |
75 host_session.connecting = true; | |
76 local handle; | |
77 handle = adns.lookup(function (answer) | |
78 handle = nil; | |
79 host_session.connecting = nil; | |
80 if answer then | |
81 log("debug", to_host.." has SRV records, handling..."); | |
82 local srv_hosts = {}; | |
83 host_session.srv_hosts = srv_hosts; | |
84 for _, record in ipairs(answer) do | |
85 t_insert(srv_hosts, record.srv); | |
86 end | |
87 if #srv_hosts == 1 and srv_hosts[1].target == "." then | |
88 log("debug", to_host.." does not provide a XMPP service"); | |
89 s2s_destroy_session(host_session, err); -- Nothing to see here | |
90 return; | |
91 end | |
92 t_sort(srv_hosts, compare_srv_priorities); | |
93 | |
94 local srv_choice = srv_hosts[1]; | |
95 host_session.srv_choice = 1; | |
96 if srv_choice then | |
97 connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port; | |
98 log("debug", "Best record found, will connect to %s:%d", connect_host, connect_port); | |
99 end | |
100 else | |
101 log("debug", to_host.." has no SRV records, falling back to A"); | |
102 end | |
103 -- Try with SRV, or just the plain hostname if no SRV | |
104 local ok, err = s2sout.try_connect(host_session, connect_host, connect_port); | |
105 if not ok then | |
106 if not s2sout.attempt_connection(host_session, err) then | |
107 -- No more attempts will be made | |
108 s2s_destroy_session(host_session, err); | |
109 end | |
110 end | |
111 end, "_xmpp-server._tcp."..connect_host..".", "SRV"); | |
112 | |
113 return true; -- Attempt in progress | |
114 elseif host_session.ip_hosts then | |
115 return s2sout.try_connect(host_session, connect_host, connect_port, err); | |
116 elseif host_session.srv_hosts and #host_session.srv_hosts > host_session.srv_choice then -- Not our first attempt, and we also have SRV | |
117 host_session.srv_choice = host_session.srv_choice + 1; | |
118 local srv_choice = host_session.srv_hosts[host_session.srv_choice]; | |
119 connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port; | |
120 host_session.log("info", "Connection failed (%s). Attempt #%d: This time to %s:%d", tostring(err), host_session.srv_choice, connect_host, connect_port); | |
121 else | |
122 host_session.log("info", "Out of connection options, can't connect to %s", tostring(host_session.to_host)); | |
123 -- We're out of options | |
124 return false; | |
125 end | |
126 | |
127 if not (connect_host and connect_port) then | |
128 -- Likely we couldn't resolve DNS | |
129 log("warn", "Hmm, we're without a host (%s) and port (%s) to connect to for %s, giving up :(", tostring(connect_host), tostring(connect_port), tostring(to_host)); | |
130 return false; | |
131 end | |
132 | |
133 return s2sout.try_connect(host_session, connect_host, connect_port); | |
134 end | |
135 | |
136 function s2sout.try_next_ip(host_session) | |
137 host_session.connecting = nil; | |
138 host_session.ip_choice = host_session.ip_choice + 1; | |
139 local ip = host_session.ip_hosts[host_session.ip_choice]; | |
140 local ok, err= s2sout.make_connect(host_session, ip.ip, ip.port); | |
141 if not ok then | |
142 if not s2sout.attempt_connection(host_session, err or "closed") then | |
143 err = err and (": "..err) or ""; | |
144 s2s_destroy_session(host_session, "Connection failed"..err); | |
145 end | |
146 end | |
147 end | |
148 | |
149 function s2sout.try_connect(host_session, connect_host, connect_port, err) | |
150 host_session.connecting = true; | |
151 | |
152 if not err then | |
153 local IPs = {}; | |
154 host_session.ip_hosts = IPs; | |
155 local handle4, handle6; | |
156 local has_other = false; | |
157 | |
158 if not sources then | |
159 sources = {}; | |
160 for i, source in ipairs(cfg_sources) do | |
161 if source == "*" then | |
162 sources[i] = new_ip("0.0.0.0", "IPv4"); | |
163 else | |
164 sources[i] = new_ip(source, (source:find(":") and "IPv6") or "IPv4"); | |
165 end | |
166 end | |
167 end | |
168 | |
169 handle4 = adns.lookup(function (reply, err) | |
170 handle4 = nil; | |
171 | |
172 -- COMPAT: This is a compromise for all you CNAME-(ab)users :) | |
173 if not (reply and reply[#reply] and reply[#reply].a) then | |
174 local count = max_dns_depth; | |
175 reply = dns.peek(connect_host, "CNAME", "IN"); | |
176 while count > 0 and reply and reply[#reply] and not reply[#reply].a and reply[#reply].cname do | |
177 log("debug", "Looking up %s (DNS depth is %d)", tostring(reply[#reply].cname), count); | |
178 reply = dns.peek(reply[#reply].cname, "A", "IN") or dns.peek(reply[#reply].cname, "CNAME", "IN"); | |
179 count = count - 1; | |
180 end | |
181 end | |
182 -- end of CNAME resolving | |
183 | |
184 if reply and reply[#reply] and reply[#reply].a then | |
185 for _, ip in ipairs(reply) do | |
186 log("debug", "DNS reply for %s gives us %s", connect_host, ip.a); | |
187 IPs[#IPs+1] = new_ip(ip.a, "IPv4"); | |
188 end | |
189 end | |
190 | |
191 if has_other then | |
192 if #IPs > 0 then | |
193 rfc3484_dest(host_session.ip_hosts, sources); | |
194 for i = 1, #IPs do | |
195 IPs[i] = {ip = IPs[i], port = connect_port}; | |
196 end | |
197 host_session.ip_choice = 0; | |
198 s2sout.try_next_ip(host_session); | |
199 else | |
200 log("debug", "DNS lookup failed to get a response for %s", connect_host); | |
201 host_session.ip_hosts = nil; | |
202 if not s2sout.attempt_connection(host_session, "name resolution failed") then -- Retry if we can | |
203 log("debug", "No other records to try for %s - destroying", host_session.to_host); | |
204 err = err and (": "..err) or ""; | |
205 s2s_destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't | |
206 end | |
207 end | |
208 else | |
209 has_other = true; | |
210 end | |
211 end, connect_host, "A", "IN"); | |
212 | |
213 handle6 = adns.lookup(function (reply, err) | |
214 handle6 = nil; | |
215 | |
216 if reply and reply[#reply] and reply[#reply].aaaa then | |
217 for _, ip in ipairs(reply) do | |
218 log("debug", "DNS reply for %s gives us %s", connect_host, ip.aaaa); | |
219 IPs[#IPs+1] = new_ip(ip.aaaa, "IPv6"); | |
220 end | |
221 end | |
222 | |
223 if has_other then | |
224 if #IPs > 0 then | |
225 rfc3484_dest(host_session.ip_hosts, sources); | |
226 for i = 1, #IPs do | |
227 IPs[i] = {ip = IPs[i], port = connect_port}; | |
228 end | |
229 host_session.ip_choice = 0; | |
230 s2sout.try_next_ip(host_session); | |
231 else | |
232 log("debug", "DNS lookup failed to get a response for %s", connect_host); | |
233 host_session.ip_hosts = nil; | |
234 if not s2sout.attempt_connection(host_session, "name resolution failed") then -- Retry if we can | |
235 log("debug", "No other records to try for %s - destroying", host_session.to_host); | |
236 err = err and (": "..err) or ""; | |
237 s2s_destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't | |
238 end | |
239 end | |
240 else | |
241 has_other = true; | |
242 end | |
243 end, connect_host, "AAAA", "IN"); | |
244 | |
245 return true; | |
246 elseif host_session.ip_hosts and #host_session.ip_hosts > host_session.ip_choice then -- Not our first attempt, and we also have IPs left to try | |
247 s2sout.try_next_ip(host_session); | |
248 else | |
249 host_session.ip_hosts = nil; | |
250 if not s2sout.attempt_connection(host_session, "out of IP addresses") then -- Retry if we can | |
251 log("debug", "No other records to try for %s - destroying", host_session.to_host); | |
252 err = err and (": "..err) or ""; | |
253 s2s_destroy_session(host_session, "Connecting failed"..err); -- End of the line, we can't | |
254 return false; | |
255 end | |
256 end | |
257 | |
258 return true; | |
259 end | |
260 | |
261 function s2sout.make_connect(host_session, connect_host, connect_port) | |
262 (host_session.log or log)("info", "Beginning new connection attempt to %s ([%s]:%d)", host_session.to_host, connect_host.addr, connect_port); | |
263 -- Ok, we're going to try to connect | |
264 | |
265 local from_host, to_host = host_session.from_host, host_session.to_host; | |
266 | |
267 local conn, handler; | |
268 if connect_host.proto == "IPv4" then | |
269 conn, handler = socket.tcp(); | |
270 else | |
271 conn, handler = socket.tcp6(); | |
272 end | |
273 | |
274 if not conn then | |
275 log("warn", "Failed to create outgoing connection, system error: %s", handler); | |
276 return false, handler; | |
277 end | |
278 | |
279 conn:settimeout(0); | |
280 local success, err = conn:connect(connect_host.addr, connect_port); | |
281 if not success and err ~= "timeout" then | |
282 log("warn", "s2s connect() to %s (%s:%d) failed: %s", host_session.to_host, connect_host.addr, connect_port, err); | |
283 return false, err; | |
284 end | |
285 | |
286 conn = wrapclient(conn, connect_host.addr, connect_port, s2s_listener, "*a"); | |
287 host_session.conn = conn; | |
288 | |
289 local filter = initialize_filters(host_session); | |
290 local w, log = conn.write, host_session.log; | |
291 host_session.sends2s = function (t) | |
292 log("debug", "sending: %s", (t.top_tag and t:top_tag()) or t:match("^[^>]*>?")); | |
293 if t.name then | |
294 t = filter("stanzas/out", t); | |
295 end | |
296 if t then | |
297 t = filter("bytes/out", tostring(t)); | |
298 if t then | |
299 return w(conn, tostring(t)); | |
300 end | |
301 end | |
302 end | |
303 | |
304 -- Register this outgoing connection so that xmppserver_listener knows about it | |
305 -- otherwise it will assume it is a new incoming connection | |
306 s2s_listener.register_outgoing(conn, host_session); | |
307 | |
308 host_session:open_stream(from_host, to_host); | |
309 | |
310 log("debug", "Connection attempt in progress..."); | |
311 return true; | |
312 end | |
313 | |
314 return s2sout; |