Software /
code /
prosody
Comparison
plugins/s2s/mod_s2s.lua @ 4580:351936a8de4a
mod_s2s: Split send_to_host() into two route/remote hooks, one for already exsisting sessions and one for non-existent.
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Sat, 03 Mar 2012 00:03:06 +0100 |
parent | 4578:da0528c59c52 |
child | 4581:d2eb5962d235 |
comparison
equal
deleted
inserted
replaced
4579:5650c290fe8e | 4580:351936a8de4a |
---|---|
58 sendq[i] = nil; | 58 sendq[i] = nil; |
59 end | 59 end |
60 session.sendq = nil; | 60 session.sendq = nil; |
61 end | 61 end |
62 | 62 |
63 function send_to_host(from_host, to_host, stanza) | 63 module:hook("route/remote", function (event) |
64 local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza; | |
64 if not hosts[from_host] then | 65 if not hosts[from_host] then |
65 log("warn", "Attempt to send stanza from %s - a host we don't serve", from_host); | 66 log("warn", "Attempt to send stanza from %s - a host we don't serve", from_host); |
66 return false; | 67 return false; |
67 end | 68 end |
68 local host = hosts[from_host].s2sout[to_host]; | 69 local host = hosts[from_host].s2sout[to_host]; |
69 if host then | 70 if host then |
70 -- We have a connection to this host already | 71 -- We have a connection to this host already |
71 if host.type == "s2sout_unauthed" and (stanza.name ~= "db:verify" or not host.dialback_key) then | 72 if host.type == "s2sout_unauthed" and (stanza.name ~= "db:verify" or not host.dialback_key) then |
72 (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host); | 73 (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host); |
73 | 74 |
74 -- Queue stanza until we are able to send it | 75 -- Queue stanza until we are able to send it |
75 if host.sendq then t_insert(host.sendq, {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)}); | 76 if host.sendq then t_insert(host.sendq, {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)}); |
76 else host.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; end | 77 else host.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; end |
77 host.log("debug", "stanza [%s] queued ", stanza.name); | 78 host.log("debug", "stanza [%s] queued ", stanza.name); |
78 elseif host.type == "local" or host.type == "component" then | 79 elseif host.type == "local" or host.type == "component" then |
88 log("error", "We are going to send from %s instead of %s", tostring(host.from_host), tostring(from_host)); | 89 log("error", "We are going to send from %s instead of %s", tostring(host.from_host), tostring(from_host)); |
89 end | 90 end |
90 host.sends2s(stanza); | 91 host.sends2s(stanza); |
91 host.log("debug", "stanza sent over "..host.type); | 92 host.log("debug", "stanza sent over "..host.type); |
92 end | 93 end |
93 else | 94 end |
94 log("debug", "opening a new outgoing connection for this stanza"); | 95 end, 200); |
95 local host_session = s2s_new_outgoing(from_host, to_host); | |
96 | |
97 -- Store in buffer | |
98 host_session.bounce_sendq = bounce_sendq; | |
99 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; | |
100 log("debug", "stanza [%s] queued until connection complete", tostring(stanza.name)); | |
101 s2sout.initiate_connection(host_session); | |
102 if (not host_session.connecting) and (not host_session.conn) then | |
103 log("warn", "Connection to %s failed already, destroying session...", to_host); | |
104 if not s2s_destroy_session(host_session, "Connection failed") then | |
105 -- Already destroyed, we need to bounce our stanza | |
106 host_session:bounce_sendq(host_session.destruction_reason); | |
107 end | |
108 return false; | |
109 end | |
110 end | |
111 return true; | |
112 end | |
113 | 96 |
114 module:hook("route/remote", function (event) | 97 module:hook("route/remote", function (event) |
115 return send_to_host(event.from_host, event.to_host, event.stanza); | 98 local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza; |
116 end); | 99 log("debug", "opening a new outgoing connection for this stanza"); |
100 local host_session = s2s_new_outgoing(from_host, to_host); | |
101 | |
102 -- Store in buffer | |
103 host_session.bounce_sendq = bounce_sendq; | |
104 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; | |
105 log("debug", "stanza [%s] queued until connection complete", tostring(stanza.name)); | |
106 s2sout.initiate_connection(host_session); | |
107 if (not host_session.connecting) and (not host_session.conn) then | |
108 log("warn", "Connection to %s failed already, destroying session...", to_host); | |
109 if not s2s_destroy_session(host_session, "Connection failed") then | |
110 -- Already destroyed, we need to bounce our stanza | |
111 host_session:bounce_sendq(host_session.destruction_reason); | |
112 end | |
113 return false; | |
114 end | |
115 end, 100); | |
117 | 116 |
118 --- Helper to check that a session peer's certificate is valid | 117 --- Helper to check that a session peer's certificate is valid |
119 local function check_cert_status(session) | 118 local function check_cert_status(session) |
120 local conn = session.conn:socket() | 119 local conn = session.conn:socket() |
121 local cert | 120 local cert |
237 s2s_mark_connected(session); | 236 s2s_mark_connected(session); |
238 end | 237 end |
239 end | 238 end |
240 end | 239 end |
241 session.notopen = nil; | 240 session.notopen = nil; |
242 session.send = function(stanza) send_to_host(session.to_host, session.from_host, stanza); end; | 241 session.send = function(stanza) prosody.events.fire_event("route/remote", { from_host = session.to_host, to_host = session.from_host, stanza = stanza}) end; |
243 end | 242 end |
244 | 243 |
245 function stream_callbacks.streamclosed(session) | 244 function stream_callbacks.streamclosed(session) |
246 (session.log or log)("debug", "Received </stream:stream>"); | 245 (session.log or log)("debug", "Received </stream:stream>"); |
247 session:close(); | 246 session:close(); |