Software /
code /
prosody
Comparison
plugins/s2s/mod_s2s.lua @ 4555:3dce04129693
s2smanager, mod_s2s, mod_s2s/s2sout: Split connection handling out of s2smanager into mod_s2s, and further split connection logic for s2sout to a module lib, s2sout.lib.lua
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Mon, 23 Jan 2012 16:28:20 +0000 |
child | 4558:d312c8605ed7 |
child | 4568:aae7a62671de |
comparison
equal
deleted
inserted
replaced
4554:6ceabde7af91 | 4555:3dce04129693 |
---|---|
1 -- Prosody IM | |
2 -- Copyright (C) 2008-2010 Matthew Wild | |
3 -- Copyright (C) 2008-2010 Waqas Hussain | |
4 -- | |
5 -- This project is MIT/X11 licensed. Please see the | |
6 -- COPYING file in the source package for more information. | |
7 -- | |
8 | |
9 module:set_global(); | |
10 | |
11 local tostring, type = tostring, type; | |
12 local xpcall, traceback = xpcall, debug.traceback; | |
13 | |
14 local add_task = require "util.timer".add_task; | |
15 local st = require "util.stanza"; | |
16 local initialize_filters = require "util.filters".initialize; | |
17 local new_xmpp_stream = require "util.xmppstream".new; | |
18 local s2s_new_incoming = require "core.s2smanager".new_incoming; | |
19 local s2s_new_outgoing = require "core.s2smanager".new_outgoing; | |
20 local s2s_destroy_session = require "core.s2smanager".destroy_session; | |
21 | |
22 local s2sout = module:require("s2sout"); | |
23 | |
24 local connect_timeout = module:get_option_number("s2s_timeout", 60); | |
25 | |
26 local sessions = module:shared("sessions"); | |
27 | |
28 --- Handle stanzas to remote domains | |
29 | |
30 local bouncy_stanzas = { message = true, presence = true, iq = true }; | |
31 local function bounce_sendq(session, reason) | |
32 local sendq = session.sendq; | |
33 if not sendq then return; end | |
34 session.log("info", "sending error replies for "..#sendq.." queued stanzas because of failed outgoing connection to "..tostring(session.to_host)); | |
35 local dummy = { | |
36 type = "s2sin"; | |
37 send = function(s) | |
38 (session.log or log)("error", "Replying to to an s2s error reply, please report this! Traceback: %s", get_traceback()); | |
39 end; | |
40 dummy = true; | |
41 }; | |
42 for i, data in ipairs(sendq) do | |
43 local reply = data[2]; | |
44 if reply and not(reply.attr.xmlns) and bouncy_stanzas[reply.name] then | |
45 reply.attr.type = "error"; | |
46 reply:tag("error", {type = "cancel"}) | |
47 :tag("remote-server-not-found", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}):up(); | |
48 if reason then | |
49 reply:tag("text", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}) | |
50 :text("Server-to-server connection failed: "..reason):up(); | |
51 end | |
52 core_process_stanza(dummy, reply); | |
53 end | |
54 sendq[i] = nil; | |
55 end | |
56 session.sendq = nil; | |
57 end | |
58 | |
59 function send_to_host(from_host, to_host, stanza) | |
60 if not hosts[from_host] then | |
61 log("warn", "Attempt to send stanza from %s - a host we don't serve", from_host); | |
62 return false; | |
63 end | |
64 local host = hosts[from_host].s2sout[to_host]; | |
65 if host then | |
66 -- We have a connection to this host already | |
67 if host.type == "s2sout_unauthed" and (stanza.name ~= "db:verify" or not host.dialback_key) then | |
68 (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host); | |
69 | |
70 -- Queue stanza until we are able to send it | |
71 if host.sendq then t_insert(host.sendq, {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)}); | |
72 else host.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; end | |
73 host.log("debug", "stanza [%s] queued ", stanza.name); | |
74 elseif host.type == "local" or host.type == "component" then | |
75 log("error", "Trying to send a stanza to ourselves??") | |
76 log("error", "Traceback: %s", get_traceback()); | |
77 log("error", "Stanza: %s", tostring(stanza)); | |
78 return false; | |
79 else | |
80 (host.log or log)("debug", "going to send stanza to "..to_host.." from "..from_host); | |
81 -- FIXME | |
82 if host.from_host ~= from_host then | |
83 log("error", "WARNING! This might, possibly, be a bug, but it might not..."); | |
84 log("error", "We are going to send from %s instead of %s", tostring(host.from_host), tostring(from_host)); | |
85 end | |
86 host.sends2s(stanza); | |
87 host.log("debug", "stanza sent over "..host.type); | |
88 end | |
89 else | |
90 log("debug", "opening a new outgoing connection for this stanza"); | |
91 local host_session = s2s_new_outgoing(from_host, to_host); | |
92 | |
93 -- Store in buffer | |
94 host_session.bounce_sendq = bounce_sendq; | |
95 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; | |
96 log("debug", "stanza [%s] queued until connection complete", tostring(stanza.name)); | |
97 if (not host_session.connecting) and (not host_session.conn) then | |
98 log("warn", "Connection to %s failed already, destroying session...", to_host); | |
99 if not s2s_destroy_session(host_session, "Connection failed") then | |
100 -- Already destroyed, we need to bounce our stanza | |
101 host_session:bounce_sendq(host_session.destruction_reason); | |
102 end | |
103 return false; | |
104 end | |
105 s2sout.initiate_connection(host_session); | |
106 end | |
107 return true; | |
108 end | |
109 | |
110 module:hook("route/remote", function (event) | |
111 return send_to_host(event.from_host, event.to_host, event.stanza); | |
112 end); | |
113 | |
114 --- Helper to check that a session peer's certificate is valid | |
115 local function check_cert_status(session) | |
116 local conn = session.conn:socket() | |
117 local cert | |
118 if conn.getpeercertificate then | |
119 cert = conn:getpeercertificate() | |
120 end | |
121 | |
122 if cert then | |
123 local chain_valid, errors = conn:getpeerverification() | |
124 -- Is there any interest in printing out all/the number of errors here? | |
125 if not chain_valid then | |
126 (session.log or log)("debug", "certificate chain validation result: invalid"); | |
127 session.cert_chain_status = "invalid"; | |
128 else | |
129 (session.log or log)("debug", "certificate chain validation result: valid"); | |
130 session.cert_chain_status = "valid"; | |
131 | |
132 local host = session.direction == "incoming" and session.from_host or session.to_host | |
133 | |
134 -- We'll go ahead and verify the asserted identity if the | |
135 -- connecting server specified one. | |
136 if host then | |
137 if cert_verify_identity(host, "xmpp-server", cert) then | |
138 session.cert_identity_status = "valid" | |
139 else | |
140 session.cert_identity_status = "invalid" | |
141 end | |
142 end | |
143 end | |
144 end | |
145 end | |
146 | |
147 --- XMPP stream event handlers | |
148 | |
149 local stream_callbacks = { default_ns = "jabber:server", handlestanza = core_process_stanza }; | |
150 | |
151 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; | |
152 | |
153 function stream_callbacks.streamopened(session, attr) | |
154 local send = session.sends2s; | |
155 | |
156 -- TODO: #29: SASL/TLS on s2s streams | |
157 session.version = tonumber(attr.version) or 0; | |
158 | |
159 -- TODO: Rename session.secure to session.encrypted | |
160 if session.secure == false then | |
161 session.secure = true; | |
162 end | |
163 | |
164 if session.direction == "incoming" then | |
165 -- Send a reply stream header | |
166 session.to_host = attr.to and nameprep(attr.to); | |
167 session.from_host = attr.from and nameprep(attr.from); | |
168 | |
169 session.streamid = uuid_gen(); | |
170 (session.log or log)("debug", "Incoming s2s received <stream:stream>"); | |
171 if session.to_host then | |
172 if not hosts[session.to_host] then | |
173 -- Attempting to connect to a host we don't serve | |
174 session:close({ | |
175 condition = "host-unknown"; | |
176 text = "This host does not serve "..session.to_host | |
177 }); | |
178 return; | |
179 elseif hosts[session.to_host].disallow_s2s then | |
180 -- Attempting to connect to a host that disallows s2s | |
181 session:close({ | |
182 condition = "policy-violation"; | |
183 text = "Server-to-server communication is not allowed to this host"; | |
184 }); | |
185 return; | |
186 end | |
187 end | |
188 | |
189 if session.secure and not session.cert_chain_status then check_cert_status(session); end | |
190 | |
191 send("<?xml version='1.0'?>"); | |
192 send(stanza("stream:stream", { xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback', | |
193 ["xmlns:stream"]='http://etherx.jabber.org/streams', id=session.streamid, from=session.to_host, to=session.from_host, version=(session.version > 0 and "1.0" or nil) }):top_tag()); | |
194 if session.version >= 1.0 then | |
195 local features = st.stanza("stream:features"); | |
196 | |
197 if session.to_host then | |
198 hosts[session.to_host].events.fire_event("s2s-stream-features", { origin = session, features = features }); | |
199 else | |
200 (session.log or log)("warn", "No 'to' on stream header from %s means we can't offer any features", session.from_host or "unknown host"); | |
201 end | |
202 | |
203 log("debug", "Sending stream features: %s", tostring(features)); | |
204 send(features); | |
205 end | |
206 elseif session.direction == "outgoing" then | |
207 -- If we are just using the connection for verifying dialback keys, we won't try and auth it | |
208 if not attr.id then error("stream response did not give us a streamid!!!"); end | |
209 session.streamid = attr.id; | |
210 | |
211 if session.secure and not session.cert_chain_status then check_cert_status(session); end | |
212 | |
213 -- Send unauthed buffer | |
214 -- (stanzas which are fine to send before dialback) | |
215 -- Note that this is *not* the stanza queue (which | |
216 -- we can only send if auth succeeds) :) | |
217 local send_buffer = session.send_buffer; | |
218 if send_buffer and #send_buffer > 0 then | |
219 log("debug", "Sending s2s send_buffer now..."); | |
220 for i, data in ipairs(send_buffer) do | |
221 session.sends2s(tostring(data)); | |
222 send_buffer[i] = nil; | |
223 end | |
224 end | |
225 session.send_buffer = nil; | |
226 | |
227 -- If server is pre-1.0, don't wait for features, just do dialback | |
228 if session.version < 1.0 then | |
229 if not session.dialback_verifying then | |
230 log("debug", "Initiating dialback..."); | |
231 initiate_dialback(session); | |
232 else | |
233 s2s_mark_connected(session); | |
234 end | |
235 end | |
236 end | |
237 session.notopen = nil; | |
238 end | |
239 | |
240 function stream_callbacks.streamclosed(session) | |
241 (session.log or log)("debug", "Received </stream:stream>"); | |
242 session:close(); | |
243 end | |
244 | |
245 function stream_callbacks.streamdisconnected(session, err) | |
246 if err and err ~= "closed" then | |
247 (session.log or log)("debug", "s2s connection attempt failed: %s", err); | |
248 if s2sout.attempt_connection(session, err) then | |
249 (session.log or log)("debug", "...so we're going to try another target"); | |
250 return true; -- Session lives for now | |
251 end | |
252 end | |
253 (session.log or log)("info", "s2s disconnected: %s->%s (%s)", tostring(session.from_host), tostring(session.to_host), tostring(err or "closed")); | |
254 sessions[session.conn] = nil; | |
255 s2s_destroy_session(session, err); | |
256 end | |
257 | |
258 function stream_callbacks.error(session, error, data) | |
259 if error == "no-stream" then | |
260 session:close("invalid-namespace"); | |
261 elseif error == "parse-error" then | |
262 session.log("debug", "Server-to-server XML parse error: %s", tostring(error)); | |
263 session:close("not-well-formed"); | |
264 elseif error == "stream-error" then | |
265 local condition, text = "undefined-condition"; | |
266 for child in data:children() do | |
267 if child.attr.xmlns == xmlns_xmpp_streams then | |
268 if child.name ~= "text" then | |
269 condition = child.name; | |
270 else | |
271 text = child:get_text(); | |
272 end | |
273 if condition ~= "undefined-condition" and text then | |
274 break; | |
275 end | |
276 end | |
277 end | |
278 text = condition .. (text and (" ("..text..")") or ""); | |
279 session.log("info", "Session closed by remote with error: %s", text); | |
280 session:close(nil, text); | |
281 end | |
282 end | |
283 | |
284 local function handleerr(err) log("error", "Traceback[s2s]: %s: %s", tostring(err), traceback()); end | |
285 function stream_callbacks.handlestanza(session, stanza) | |
286 if stanza.attr.xmlns == "jabber:client" then --COMPAT: Prosody pre-0.6.2 may send jabber:client | |
287 stanza.attr.xmlns = nil; | |
288 end | |
289 stanza = session.filter("stanzas/in", stanza); | |
290 if stanza then | |
291 return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); | |
292 end | |
293 end | |
294 | |
295 local listener = { default_port = 5269, default_mode = "*a", default_interface = "*" }; | |
296 | |
297 --- Session methods | |
298 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; | |
299 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; | |
300 local function session_close(session, reason, remote_reason) | |
301 local log = session.log or log; | |
302 if session.conn then | |
303 if session.notopen then | |
304 session.sends2s("<?xml version='1.0'?>"); | |
305 session.sends2s(st.stanza("stream:stream", default_stream_attr):top_tag()); | |
306 end | |
307 if reason then | |
308 if type(reason) == "string" then -- assume stream error | |
309 log("info", "Disconnecting %s[%s], <stream:error> is: %s", session.host or "(unknown host)", session.type, reason); | |
310 session.sends2s(st.stanza("stream:error"):tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' })); | |
311 elseif type(reason) == "table" then | |
312 if reason.condition then | |
313 local stanza = st.stanza("stream:error"):tag(reason.condition, stream_xmlns_attr):up(); | |
314 if reason.text then | |
315 stanza:tag("text", stream_xmlns_attr):text(reason.text):up(); | |
316 end | |
317 if reason.extra then | |
318 stanza:add_child(reason.extra); | |
319 end | |
320 log("info", "Disconnecting %s[%s], <stream:error> is: %s", session.host or "(unknown host)", session.type, tostring(stanza)); | |
321 session.sends2s(stanza); | |
322 elseif reason.name then -- a stanza | |
323 log("info", "Disconnecting %s->%s[%s], <stream:error> is: %s", session.from_host or "(unknown host)", session.to_host or "(unknown host)", session.type, tostring(reason)); | |
324 session.sends2s(reason); | |
325 end | |
326 end | |
327 end | |
328 session.sends2s("</stream:stream>"); | |
329 if session.notopen or not session.conn:close() then | |
330 session.conn:close(true); -- Force FIXME: timer? | |
331 end | |
332 session.conn:close(); | |
333 listener.ondisconnect(session.conn, remote_reason or (reason and (reason.text or reason.condition)) or reason or "stream closed"); | |
334 end | |
335 end | |
336 | |
337 -- Session initialization logic shared by incoming and outgoing | |
338 local function initialize_session(session) | |
339 local stream = new_xmpp_stream(session, stream_callbacks); | |
340 session.stream = stream; | |
341 | |
342 session.notopen = true; | |
343 | |
344 function session.reset_stream() | |
345 session.notopen = true; | |
346 session.stream:reset(); | |
347 end | |
348 | |
349 local filter = session.filter; | |
350 function session.data(data) | |
351 data = filter("bytes/in", data); | |
352 if data then | |
353 local ok, err = stream:feed(data); | |
354 if ok then return; end | |
355 (session.log or log)("warn", "Received invalid XML: %s", data); | |
356 (session.log or log)("warn", "Problem was: %s", err); | |
357 session:close("not-well-formed"); | |
358 end | |
359 end | |
360 | |
361 session.close = session_close; | |
362 | |
363 local handlestanza = stream_callbacks.handlestanza; | |
364 function session.dispatch_stanza(session, stanza) | |
365 return handlestanza(session, stanza); | |
366 end | |
367 | |
368 local conn = session.conn; | |
369 add_task(connect_timeout, function () | |
370 if session.conn ~= conn or session.connecting | |
371 or session.type == "s2sin" or session.type == "s2sout" then | |
372 return; -- Ok, we're connect[ed|ing] | |
373 end | |
374 -- Not connected, need to close session and clean up | |
375 (session.log or log)("debug", "Destroying incomplete session %s->%s due to inactivity", | |
376 session.from_host or "(unknown)", session.to_host or "(unknown)"); | |
377 session:close("connection-timeout"); | |
378 end); | |
379 end | |
380 | |
381 function listener.onconnect(conn) | |
382 if not sessions[conn] then -- May be an existing outgoing session | |
383 local session = s2s_new_incoming(conn); | |
384 sessions[conn] = session; | |
385 session.log("debug", "Incoming s2s connection"); | |
386 | |
387 local filter = initialize_filters(session); | |
388 local w = conn.write; | |
389 session.sends2s = function (t) | |
390 log("debug", "sending: %s", t.top_tag and t:top_tag() or t:match("^([^>]*>?)")); | |
391 if t.name then | |
392 t = filter("stanzas/out", t); | |
393 end | |
394 if t then | |
395 t = filter("bytes/out", tostring(t)); | |
396 if t then | |
397 return w(conn, t); | |
398 end | |
399 end | |
400 end | |
401 | |
402 initialize_session(session); | |
403 end | |
404 end | |
405 | |
406 function listener.onincoming(conn, data) | |
407 local session = sessions[conn]; | |
408 if session then | |
409 session.data(data); | |
410 end | |
411 end | |
412 | |
413 function listener.onstatus(conn, status) | |
414 if status == "ssl-handshake-complete" then | |
415 local session = sessions[conn]; | |
416 if session and session.direction == "outgoing" then | |
417 local to_host, from_host = session.to_host, session.from_host; | |
418 session.log("debug", "Sending stream header..."); | |
419 session:open_stream(session.from_host, session.to_host); | |
420 end | |
421 end | |
422 end | |
423 | |
424 function listener.ondisconnect(conn, err) | |
425 local session = sessions[conn]; | |
426 if session then | |
427 if stream_callbacks.streamdisconnected(session, err) then | |
428 return; -- Connection lives, for now | |
429 end | |
430 end | |
431 sessions[conn] = nil; | |
432 end | |
433 | |
434 function listener.register_outgoing(conn, session) | |
435 session.direction = "outgoing"; | |
436 sessions[conn] = session; | |
437 initialize_session(session); | |
438 end | |
439 | |
440 s2sout.set_listener(listener); | |
441 | |
442 require "core.portmanager".register_service("s2s", { | |
443 listener = listener; | |
444 default_port = 5269; | |
445 encryption = "starttls"; | |
446 }); | |
447 |