Comparison

plugins/s2s/mod_s2s.lua @ 4580:351936a8de4a

mod_s2s: Split send_to_host() into two route/remote hooks, one for already exsisting sessions and one for non-existent.
author Kim Alvefur <zash@zash.se>
date Sat, 03 Mar 2012 00:03:06 +0100
parent 4578:da0528c59c52
child 4581:d2eb5962d235
comparison
equal deleted inserted replaced
4579:5650c290fe8e 4580:351936a8de4a
58 sendq[i] = nil; 58 sendq[i] = nil;
59 end 59 end
60 session.sendq = nil; 60 session.sendq = nil;
61 end 61 end
62 62
63 function send_to_host(from_host, to_host, stanza) 63 module:hook("route/remote", function (event)
64 local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza;
64 if not hosts[from_host] then 65 if not hosts[from_host] then
65 log("warn", "Attempt to send stanza from %s - a host we don't serve", from_host); 66 log("warn", "Attempt to send stanza from %s - a host we don't serve", from_host);
66 return false; 67 return false;
67 end 68 end
68 local host = hosts[from_host].s2sout[to_host]; 69 local host = hosts[from_host].s2sout[to_host];
69 if host then 70 if host then
70 -- We have a connection to this host already 71 -- We have a connection to this host already
71 if host.type == "s2sout_unauthed" and (stanza.name ~= "db:verify" or not host.dialback_key) then 72 if host.type == "s2sout_unauthed" and (stanza.name ~= "db:verify" or not host.dialback_key) then
72 (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host); 73 (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host);
73 74
74 -- Queue stanza until we are able to send it 75 -- Queue stanza until we are able to send it
75 if host.sendq then t_insert(host.sendq, {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)}); 76 if host.sendq then t_insert(host.sendq, {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)});
76 else host.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; end 77 else host.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; end
77 host.log("debug", "stanza [%s] queued ", stanza.name); 78 host.log("debug", "stanza [%s] queued ", stanza.name);
78 elseif host.type == "local" or host.type == "component" then 79 elseif host.type == "local" or host.type == "component" then
88 log("error", "We are going to send from %s instead of %s", tostring(host.from_host), tostring(from_host)); 89 log("error", "We are going to send from %s instead of %s", tostring(host.from_host), tostring(from_host));
89 end 90 end
90 host.sends2s(stanza); 91 host.sends2s(stanza);
91 host.log("debug", "stanza sent over "..host.type); 92 host.log("debug", "stanza sent over "..host.type);
92 end 93 end
93 else 94 end
94 log("debug", "opening a new outgoing connection for this stanza"); 95 end, 200);
95 local host_session = s2s_new_outgoing(from_host, to_host);
96
97 -- Store in buffer
98 host_session.bounce_sendq = bounce_sendq;
99 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} };
100 log("debug", "stanza [%s] queued until connection complete", tostring(stanza.name));
101 s2sout.initiate_connection(host_session);
102 if (not host_session.connecting) and (not host_session.conn) then
103 log("warn", "Connection to %s failed already, destroying session...", to_host);
104 if not s2s_destroy_session(host_session, "Connection failed") then
105 -- Already destroyed, we need to bounce our stanza
106 host_session:bounce_sendq(host_session.destruction_reason);
107 end
108 return false;
109 end
110 end
111 return true;
112 end
113 96
114 module:hook("route/remote", function (event) 97 module:hook("route/remote", function (event)
115 return send_to_host(event.from_host, event.to_host, event.stanza); 98 local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza;
116 end); 99 log("debug", "opening a new outgoing connection for this stanza");
100 local host_session = s2s_new_outgoing(from_host, to_host);
101
102 -- Store in buffer
103 host_session.bounce_sendq = bounce_sendq;
104 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} };
105 log("debug", "stanza [%s] queued until connection complete", tostring(stanza.name));
106 s2sout.initiate_connection(host_session);
107 if (not host_session.connecting) and (not host_session.conn) then
108 log("warn", "Connection to %s failed already, destroying session...", to_host);
109 if not s2s_destroy_session(host_session, "Connection failed") then
110 -- Already destroyed, we need to bounce our stanza
111 host_session:bounce_sendq(host_session.destruction_reason);
112 end
113 return false;
114 end
115 end, 100);
117 116
118 --- Helper to check that a session peer's certificate is valid 117 --- Helper to check that a session peer's certificate is valid
119 local function check_cert_status(session) 118 local function check_cert_status(session)
120 local conn = session.conn:socket() 119 local conn = session.conn:socket()
121 local cert 120 local cert
237 s2s_mark_connected(session); 236 s2s_mark_connected(session);
238 end 237 end
239 end 238 end
240 end 239 end
241 session.notopen = nil; 240 session.notopen = nil;
242 session.send = function(stanza) send_to_host(session.to_host, session.from_host, stanza); end; 241 session.send = function(stanza) prosody.events.fire_event("route/remote", { from_host = session.to_host, to_host = session.from_host, stanza = stanza}) end;
243 end 242 end
244 243
245 function stream_callbacks.streamclosed(session) 244 function stream_callbacks.streamclosed(session)
246 (session.log or log)("debug", "Received </stream:stream>"); 245 (session.log or log)("debug", "Received </stream:stream>");
247 session:close(); 246 session:close();