Software / code / prosody
Comparison
plugins/mod_s2s/s2sout.lib.lua @ 4814:474684c07a3a
Rename plugins/s2s/ to plugins/mod_s2s/
| author | Matthew Wild <mwild1@gmail.com> |
|---|---|
| date | Fri, 04 May 2012 00:05:15 +0100 |
| parent | 4791:plugins/s2s/s2sout.lib.lua@c01066f27a5d |
| child | 4832:6b3aec1e0d9f |
comparison
equal
deleted
inserted
replaced
| 4813:77da9671ac39 | 4814:474684c07a3a |
|---|---|
| 1 -- Prosody IM | |
| 2 -- Copyright (C) 2008-2010 Matthew Wild | |
| 3 -- Copyright (C) 2008-2010 Waqas Hussain | |
| 4 -- | |
| 5 -- This project is MIT/X11 licensed. Please see the | |
| 6 -- COPYING file in the source package for more information. | |
| 7 -- | |
| 8 | |
| 9 --- Module containing all the logic for connecting to a remote server | |
| 10 | |
| 11 local portmanager = require "core.portmanager"; | |
| 12 local wrapclient = require "net.server".wrapclient; | |
| 13 local initialize_filters = require "util.filters".initialize; | |
| 14 local idna_to_ascii = require "util.encodings".idna.to_ascii; | |
| 15 local new_ip = require "util.ip".new_ip; | |
| 16 local rfc3484_dest = require "util.rfc3484".destination; | |
| 17 local socket = require "socket"; | |
| 18 local adns = require "net.adns"; | |
| 19 local dns = require "net.dns"; | |
| 20 local t_insert, t_sort, ipairs = table.insert, table.sort, ipairs; | |
| 21 local st = require "util.stanza"; | |
| 22 | |
| 23 local s2s_destroy_session = require "core.s2smanager".destroy_session; | |
| 24 | |
| 25 local log = module._log; | |
| 26 | |
| 27 local sources = {}; | |
| 28 | |
| 29 local max_dns_depth = module:get_option_number("dns_max_depth", 3); | |
| 30 | |
| 31 local s2sout = {}; | |
| 32 | |
| 33 local s2s_listener; | |
| 34 | |
| 35 | |
| 36 function s2sout.set_listener(listener) | |
| 37 s2s_listener = listener; | |
| 38 end | |
| 39 | |
| 40 local function compare_srv_priorities(a,b) | |
| 41 return a.priority < b.priority or (a.priority == b.priority and a.weight > b.weight); | |
| 42 end | |
| 43 | |
| 44 local function session_open_stream(session, from, to) | |
| 45 session.sends2s(st.stanza("stream:stream", { | |
| 46 xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback', | |
| 47 ["xmlns:stream"]='http://etherx.jabber.org/streams', | |
| 48 from=from, to=to, version='1.0', ["xml:lang"]='en'}):top_tag()); | |
| 49 end | |
| 50 | |
| 51 function s2sout.initiate_connection(host_session) | |
| 52 initialize_filters(host_session); | |
| 53 host_session.open_stream = session_open_stream; | |
| 54 | |
| 55 -- Kick the connection attempting machine into life | |
| 56 if not s2sout.attempt_connection(host_session) then | |
| 57 -- Intentionally not returning here, the | |
| 58 -- session is needed, connected or not | |
| 59 s2s_destroy_session(host_session); | |
| 60 end | |
| 61 | |
| 62 if not host_session.sends2s then | |
| 63 -- A sends2s which buffers data (until the stream is opened) | |
| 64 -- note that data in this buffer will be sent before the stream is authed | |
| 65 -- and will not be ack'd in any way, successful or otherwise | |
| 66 local buffer; | |
| 67 function host_session.sends2s(data) | |
| 68 if not buffer then | |
| 69 buffer = {}; | |
| 70 host_session.send_buffer = buffer; | |
| 71 end | |
| 72 log("debug", "Buffering data on unconnected s2sout to %s", tostring(host_session.to_host)); | |
| 73 buffer[#buffer+1] = data; | |
| 74 log("debug", "Buffered item %d: %s", #buffer, tostring(data)); | |
| 75 end | |
| 76 end | |
| 77 end | |
| 78 | |
| 79 function s2sout.attempt_connection(host_session, err) | |
| 80 local from_host, to_host = host_session.from_host, host_session.to_host; | |
| 81 local connect_host, connect_port = to_host and idna_to_ascii(to_host), 5269; | |
| 82 | |
| 83 if not connect_host then | |
| 84 return false; | |
| 85 end | |
| 86 | |
| 87 if not err then -- This is our first attempt | |
| 88 log("debug", "First attempt to connect to %s, starting with SRV lookup...", to_host); | |
| 89 host_session.connecting = true; | |
| 90 local handle; | |
| 91 handle = adns.lookup(function (answer) | |
| 92 handle = nil; | |
| 93 host_session.connecting = nil; | |
| 94 if answer then | |
| 95 log("debug", to_host.." has SRV records, handling..."); | |
| 96 local srv_hosts = {}; | |
| 97 host_session.srv_hosts = srv_hosts; | |
| 98 for _, record in ipairs(answer) do | |
| 99 t_insert(srv_hosts, record.srv); | |
| 100 end | |
| 101 if #srv_hosts == 1 and srv_hosts[1].target == "." then | |
| 102 log("debug", to_host.." does not provide a XMPP service"); | |
| 103 s2s_destroy_session(host_session, err); -- Nothing to see here | |
| 104 return; | |
| 105 end | |
| 106 t_sort(srv_hosts, compare_srv_priorities); | |
| 107 | |
| 108 local srv_choice = srv_hosts[1]; | |
| 109 host_session.srv_choice = 1; | |
| 110 if srv_choice then | |
| 111 connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port; | |
| 112 log("debug", "Best record found, will connect to %s:%d", connect_host, connect_port); | |
| 113 end | |
| 114 else | |
| 115 log("debug", to_host.." has no SRV records, falling back to A"); | |
| 116 end | |
| 117 -- Try with SRV, or just the plain hostname if no SRV | |
| 118 local ok, err = s2sout.try_connect(host_session, connect_host, connect_port); | |
| 119 if not ok then | |
| 120 if not s2sout.attempt_connection(host_session, err) then | |
| 121 -- No more attempts will be made | |
| 122 s2s_destroy_session(host_session, err); | |
| 123 end | |
| 124 end | |
| 125 end, "_xmpp-server._tcp."..connect_host..".", "SRV"); | |
| 126 | |
| 127 return true; -- Attempt in progress | |
| 128 elseif host_session.ip_hosts then | |
| 129 return s2sout.try_connect(host_session, connect_host, connect_port, err); | |
| 130 elseif host_session.srv_hosts and #host_session.srv_hosts > host_session.srv_choice then -- Not our first attempt, and we also have SRV | |
| 131 host_session.srv_choice = host_session.srv_choice + 1; | |
| 132 local srv_choice = host_session.srv_hosts[host_session.srv_choice]; | |
| 133 connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port; | |
| 134 host_session.log("info", "Connection failed (%s). Attempt #%d: This time to %s:%d", tostring(err), host_session.srv_choice, connect_host, connect_port); | |
| 135 else | |
| 136 host_session.log("info", "Out of connection options, can't connect to %s", tostring(host_session.to_host)); | |
| 137 -- We're out of options | |
| 138 return false; | |
| 139 end | |
| 140 | |
| 141 if not (connect_host and connect_port) then | |
| 142 -- Likely we couldn't resolve DNS | |
| 143 log("warn", "Hmm, we're without a host (%s) and port (%s) to connect to for %s, giving up :(", tostring(connect_host), tostring(connect_port), tostring(to_host)); | |
| 144 return false; | |
| 145 end | |
| 146 | |
| 147 return s2sout.try_connect(host_session, connect_host, connect_port); | |
| 148 end | |
| 149 | |
| 150 function s2sout.try_next_ip(host_session) | |
| 151 host_session.connecting = nil; | |
| 152 host_session.ip_choice = host_session.ip_choice + 1; | |
| 153 local ip = host_session.ip_hosts[host_session.ip_choice]; | |
| 154 local ok, err= s2sout.make_connect(host_session, ip.ip, ip.port); | |
| 155 if not ok then | |
| 156 if not s2sout.attempt_connection(host_session, err or "closed") then | |
| 157 err = err and (": "..err) or ""; | |
| 158 s2s_destroy_session(host_session, "Connection failed"..err); | |
| 159 end | |
| 160 end | |
| 161 end | |
| 162 | |
| 163 function s2sout.try_connect(host_session, connect_host, connect_port, err) | |
| 164 host_session.connecting = true; | |
| 165 | |
| 166 if not err then | |
| 167 local IPs = {}; | |
| 168 host_session.ip_hosts = IPs; | |
| 169 local handle4, handle6; | |
| 170 local has_other = false; | |
| 171 | |
| 172 handle4 = adns.lookup(function (reply, err) | |
| 173 handle4 = nil; | |
| 174 | |
| 175 -- COMPAT: This is a compromise for all you CNAME-(ab)users :) | |
| 176 if not (reply and reply[#reply] and reply[#reply].a) then | |
| 177 local count = max_dns_depth; | |
| 178 reply = dns.peek(connect_host, "CNAME", "IN"); | |
| 179 while count > 0 and reply and reply[#reply] and not reply[#reply].a and reply[#reply].cname do | |
| 180 log("debug", "Looking up %s (DNS depth is %d)", tostring(reply[#reply].cname), count); | |
| 181 reply = dns.peek(reply[#reply].cname, "A", "IN") or dns.peek(reply[#reply].cname, "CNAME", "IN"); | |
| 182 count = count - 1; | |
| 183 end | |
| 184 end | |
| 185 -- end of CNAME resolving | |
| 186 | |
| 187 if reply and reply[#reply] and reply[#reply].a then | |
| 188 for _, ip in ipairs(reply) do | |
| 189 log("debug", "DNS reply for %s gives us %s", connect_host, ip.a); | |
| 190 IPs[#IPs+1] = new_ip(ip.a, "IPv4"); | |
| 191 end | |
| 192 end | |
| 193 | |
| 194 if has_other then | |
| 195 if #IPs > 0 then | |
| 196 rfc3484_dest(host_session.ip_hosts, sources); | |
| 197 for i = 1, #IPs do | |
| 198 IPs[i] = {ip = IPs[i], port = connect_port}; | |
| 199 end | |
| 200 host_session.ip_choice = 0; | |
| 201 s2sout.try_next_ip(host_session); | |
| 202 else | |
| 203 log("debug", "DNS lookup failed to get a response for %s", connect_host); | |
| 204 host_session.ip_hosts = nil; | |
| 205 if not s2sout.attempt_connection(host_session, "name resolution failed") then -- Retry if we can | |
| 206 log("debug", "No other records to try for %s - destroying", host_session.to_host); | |
| 207 err = err and (": "..err) or ""; | |
| 208 s2s_destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't | |
| 209 end | |
| 210 end | |
| 211 else | |
| 212 has_other = true; | |
| 213 end | |
| 214 end, connect_host, "A", "IN"); | |
| 215 | |
| 216 handle6 = adns.lookup(function (reply, err) | |
| 217 handle6 = nil; | |
| 218 | |
| 219 if reply and reply[#reply] and reply[#reply].aaaa then | |
| 220 for _, ip in ipairs(reply) do | |
| 221 log("debug", "DNS reply for %s gives us %s", connect_host, ip.aaaa); | |
| 222 IPs[#IPs+1] = new_ip(ip.aaaa, "IPv6"); | |
| 223 end | |
| 224 end | |
| 225 | |
| 226 if has_other then | |
| 227 if #IPs > 0 then | |
| 228 rfc3484_dest(host_session.ip_hosts, sources); | |
| 229 for i = 1, #IPs do | |
| 230 IPs[i] = {ip = IPs[i], port = connect_port}; | |
| 231 end | |
| 232 host_session.ip_choice = 0; | |
| 233 s2sout.try_next_ip(host_session); | |
| 234 else | |
| 235 log("debug", "DNS lookup failed to get a response for %s", connect_host); | |
| 236 host_session.ip_hosts = nil; | |
| 237 if not s2sout.attempt_connection(host_session, "name resolution failed") then -- Retry if we can | |
| 238 log("debug", "No other records to try for %s - destroying", host_session.to_host); | |
| 239 err = err and (": "..err) or ""; | |
| 240 s2s_destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't | |
| 241 end | |
| 242 end | |
| 243 else | |
| 244 has_other = true; | |
| 245 end | |
| 246 end, connect_host, "AAAA", "IN"); | |
| 247 | |
| 248 return true; | |
| 249 elseif host_session.ip_hosts and #host_session.ip_hosts > host_session.ip_choice then -- Not our first attempt, and we also have IPs left to try | |
| 250 s2sout.try_next_ip(host_session); | |
| 251 else | |
| 252 host_session.ip_hosts = nil; | |
| 253 if not s2sout.attempt_connection(host_session, "out of IP addresses") then -- Retry if we can | |
| 254 log("debug", "No other records to try for %s - destroying", host_session.to_host); | |
| 255 err = err and (": "..err) or ""; | |
| 256 s2s_destroy_session(host_session, "Connecting failed"..err); -- End of the line, we can't | |
| 257 return false; | |
| 258 end | |
| 259 end | |
| 260 | |
| 261 return true; | |
| 262 end | |
| 263 | |
| 264 function s2sout.make_connect(host_session, connect_host, connect_port) | |
| 265 (host_session.log or log)("info", "Beginning new connection attempt to %s ([%s]:%d)", host_session.to_host, connect_host.addr, connect_port); | |
| 266 -- Ok, we're going to try to connect | |
| 267 | |
| 268 local from_host, to_host = host_session.from_host, host_session.to_host; | |
| 269 | |
| 270 local conn, handler; | |
| 271 if connect_host.proto == "IPv4" then | |
| 272 conn, handler = socket.tcp(); | |
| 273 else | |
| 274 if not socket.tcp6 then | |
| 275 log("warn", "Could not connect to "..to_host..". Your version of lua-socket does not support IPv6"); | |
| 276 return false, "no-ipv6"; | |
| 277 end | |
| 278 conn, handler = socket.tcp6(); | |
| 279 end | |
| 280 | |
| 281 if not conn then | |
| 282 log("warn", "Failed to create outgoing connection, system error: %s", handler); | |
| 283 return false, handler; | |
| 284 end | |
| 285 | |
| 286 conn:settimeout(0); | |
| 287 local success, err = conn:connect(connect_host.addr, connect_port); | |
| 288 if not success and err ~= "timeout" then | |
| 289 log("warn", "s2s connect() to %s (%s:%d) failed: %s", host_session.to_host, connect_host.addr, connect_port, err); | |
| 290 return false, err; | |
| 291 end | |
| 292 | |
| 293 conn = wrapclient(conn, connect_host.addr, connect_port, s2s_listener, "*a"); | |
| 294 host_session.conn = conn; | |
| 295 | |
| 296 local filter = initialize_filters(host_session); | |
| 297 local w, log = conn.write, host_session.log; | |
| 298 host_session.sends2s = function (t) | |
| 299 log("debug", "sending: %s", (t.top_tag and t:top_tag()) or t:match("^[^>]*>?")); | |
| 300 if t.name then | |
| 301 t = filter("stanzas/out", t); | |
| 302 end | |
| 303 if t then | |
| 304 t = filter("bytes/out", tostring(t)); | |
| 305 if t then | |
| 306 return w(conn, tostring(t)); | |
| 307 end | |
| 308 end | |
| 309 end | |
| 310 | |
| 311 -- Register this outgoing connection so that xmppserver_listener knows about it | |
| 312 -- otherwise it will assume it is a new incoming connection | |
| 313 s2s_listener.register_outgoing(conn, host_session); | |
| 314 | |
| 315 host_session:open_stream(from_host, to_host); | |
| 316 | |
| 317 log("debug", "Connection attempt in progress..."); | |
| 318 return true; | |
| 319 end | |
| 320 | |
| 321 module:hook_global("service-added", function (event) | |
| 322 if event.name ~= "s2s" then return end | |
| 323 | |
| 324 local s2s_sources = portmanager.get_active_services():get("s2s"); | |
| 325 if not s2s_sources then | |
| 326 module:log("warn", "s2s not listening on any ports, outgoing connections may fail"); | |
| 327 return; | |
| 328 end | |
| 329 for source, _ in pairs(s2s_sources) do | |
| 330 if source == "*" or source == "0.0.0.0" then | |
| 331 if not socket.local_addresses then | |
| 332 sources[#sources + 1] = new_ip("0.0.0.0", "IPv4"); | |
| 333 else | |
| 334 for _, addr in ipairs(socket.local_addresses("ipv4", true)) do | |
| 335 sources[#sources + 1] = new_ip(addr, "IPv4"); | |
| 336 end | |
| 337 end | |
| 338 elseif source == "::" then | |
| 339 if not socket.local_addresses then | |
| 340 sources[#sources + 1] = new_ip("::", "IPv6"); | |
| 341 else | |
| 342 for _, addr in ipairs(socket.local_addresses("ipv6", true)) do | |
| 343 sources[#sources + 1] = new_ip(addr, "IPv6"); | |
| 344 end | |
| 345 end | |
| 346 else | |
| 347 sources[#sources + 1] = new_ip(source, (source:find(":") and "IPv6") or "IPv4"); | |
| 348 end | |
| 349 end | |
| 350 end); | |
| 351 | |
| 352 return s2sout; |