Software /
code /
prosody
Comparison
plugins/mod_c2s.lua @ 6054:7a5ddbaf758d
Merge 0.9->0.10
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Wed, 02 Apr 2014 17:41:38 +0100 |
parent | 6005:98b768a41c9d |
child | 6063:e626ee2fe106 |
comparison
equal
deleted
inserted
replaced
6053:2f93a04564b2 | 6054:7a5ddbaf758d |
---|---|
1 -- Prosody IM | 1 -- Prosody IM |
2 -- Copyright (C) 2008-2010 Matthew Wild | 2 -- Copyright (C) 2008-2010 Matthew Wild |
3 -- Copyright (C) 2008-2010 Waqas Hussain | 3 -- Copyright (C) 2008-2010 Waqas Hussain |
4 -- | 4 -- |
5 -- This project is MIT/X11 licensed. Please see the | 5 -- This project is MIT/X11 licensed. Please see the |
6 -- COPYING file in the source package for more information. | 6 -- COPYING file in the source package for more information. |
7 -- | 7 -- |
8 | 8 |
9 module:set_global(); | 9 module:set_global(); |
13 local nameprep = require "util.encodings".stringprep.nameprep; | 13 local nameprep = require "util.encodings".stringprep.nameprep; |
14 local sessionmanager = require "core.sessionmanager"; | 14 local sessionmanager = require "core.sessionmanager"; |
15 local st = require "util.stanza"; | 15 local st = require "util.stanza"; |
16 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; | 16 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; |
17 local uuid_generate = require "util.uuid".generate; | 17 local uuid_generate = require "util.uuid".generate; |
18 local runner = require "util.async".runner; | |
18 | 19 |
19 local xpcall, tostring, type = xpcall, tostring, type; | 20 local xpcall, tostring, type = xpcall, tostring, type; |
20 local traceback = debug.traceback; | 21 local t_insert, t_remove = table.insert, table.remove; |
21 | 22 |
22 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; | 23 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; |
23 | 24 |
24 local log = module._log; | 25 local log = module._log; |
25 | 26 |
29 | 30 |
30 local sessions = module:shared("sessions"); | 31 local sessions = module:shared("sessions"); |
31 local core_process_stanza = prosody.core_process_stanza; | 32 local core_process_stanza = prosody.core_process_stanza; |
32 local hosts = prosody.hosts; | 33 local hosts = prosody.hosts; |
33 | 34 |
34 local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza }; | 35 local stream_callbacks = { default_ns = "jabber:client" }; |
35 local listener = {}; | 36 local listener = {}; |
37 local runner_callbacks = {}; | |
36 | 38 |
37 --- Stream events handlers | 39 --- Stream events handlers |
38 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; | 40 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; |
39 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; | |
40 | 41 |
41 function stream_callbacks.streamopened(session, attr) | 42 function stream_callbacks.streamopened(session, attr) |
42 local send = session.send; | 43 local send = session.send; |
43 session.host = nameprep(attr.to); | 44 session.host = nameprep(attr.to); |
44 if not session.host then | 45 if not session.host then |
48 end | 49 end |
49 session.version = tonumber(attr.version) or 0; | 50 session.version = tonumber(attr.version) or 0; |
50 session.streamid = uuid_generate(); | 51 session.streamid = uuid_generate(); |
51 (session.log or session)("debug", "Client sent opening <stream:stream> to %s", session.host); | 52 (session.log or session)("debug", "Client sent opening <stream:stream> to %s", session.host); |
52 | 53 |
53 if not hosts[session.host] then | 54 if not hosts[session.host] or not hosts[session.host].modules.c2s then |
54 -- We don't serve this host... | 55 -- We don't serve this host... |
55 session:close{ condition = "host-unknown", text = "This server does not serve "..tostring(session.host)}; | 56 session:close{ condition = "host-unknown", text = "This server does not serve "..tostring(session.host)}; |
56 return; | 57 return; |
57 end | 58 end |
58 | 59 |
59 send("<?xml version='1.0'?>"..st.stanza("stream:stream", { | 60 session:open_stream(); |
60 xmlns = 'jabber:client', ["xmlns:stream"] = 'http://etherx.jabber.org/streams'; | |
61 id = session.streamid, from = session.host, version = '1.0', ["xml:lang"] = 'en' }):top_tag()); | |
62 | 61 |
63 (session.log or log)("debug", "Sent reply <stream:stream> to client"); | 62 (session.log or log)("debug", "Sent reply <stream:stream> to client"); |
64 session.notopen = nil; | 63 session.notopen = nil; |
65 | 64 |
66 -- If session.secure is *false* (not nil) then it means we /were/ encrypting | 65 -- If session.secure is *false* (not nil) then it means we /were/ encrypting |
67 -- since we now have a new stream header, session is secured | 66 -- since we now have a new stream header, session is secured |
68 if session.secure == false then | 67 if session.secure == false then |
69 session.secure = true; | 68 session.secure = true; |
70 | 69 session.encrypted = true; |
71 -- Check if TLS compression is used | 70 |
72 local sock = session.conn:socket(); | 71 local sock = session.conn:socket(); |
73 if sock.info then | 72 if sock.info then |
74 session.compressed = sock:info"compression"; | 73 local info = sock:info(); |
75 elseif sock.compression then | 74 (session.log or log)("info", "Stream encrypted (%s with %s)", info.protocol, info.cipher); |
76 session.compressed = sock:compression(); --COMPAT mw/luasec-hg | 75 session.compressed = info.compression; |
76 else | |
77 (session.log or log)("info", "Stream encrypted"); | |
78 session.compressed = sock.compression and sock:compression(); --COMPAT mw/luasec-hg | |
77 end | 79 end |
78 end | 80 end |
79 | 81 |
80 local features = st.stanza("stream:features"); | 82 local features = st.stanza("stream:features"); |
81 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); | 83 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); |
82 module:fire_event("stream-features", session, features); | |
83 | |
84 send(features); | 84 send(features); |
85 end | 85 end |
86 | 86 |
87 function stream_callbacks.streamclosed(session) | 87 function stream_callbacks.streamclosed(session) |
88 session.log("debug", "Received </stream:stream>"); | 88 session.log("debug", "Received </stream:stream>"); |
114 session.log("info", "Session closed by remote with error: %s", text); | 114 session.log("info", "Session closed by remote with error: %s", text); |
115 session:close(nil, text); | 115 session:close(nil, text); |
116 end | 116 end |
117 end | 117 end |
118 | 118 |
119 local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end | |
120 function stream_callbacks.handlestanza(session, stanza) | 119 function stream_callbacks.handlestanza(session, stanza) |
121 stanza = session.filter("stanzas/in", stanza); | 120 stanza = session.filter("stanzas/in", stanza); |
122 if stanza then | 121 session.thread:run(stanza); |
123 return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); | |
124 end | |
125 end | 122 end |
126 | 123 |
127 --- Session methods | 124 --- Session methods |
128 local function session_close(session, reason) | 125 local function session_close(session, reason) |
129 local log = session.log or log; | 126 local log = session.log or log; |
130 if session.conn then | 127 if session.conn then |
131 if session.notopen then | 128 if session.notopen then |
132 session.send("<?xml version='1.0'?>"); | 129 session:open_stream(); |
133 session.send(st.stanza("stream:stream", default_stream_attr):top_tag()); | |
134 end | 130 end |
135 if reason then -- nil == no err, initiated by us, false == initiated by client | 131 if reason then -- nil == no err, initiated by us, false == initiated by client |
136 local stream_error = st.stanza("stream:error"); | 132 local stream_error = st.stanza("stream:error"); |
137 if type(reason) == "string" then -- assume stream error | 133 if type(reason) == "string" then -- assume stream error |
138 stream_error:tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }); | 134 stream_error:tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }); |
151 end | 147 end |
152 stream_error = tostring(stream_error); | 148 stream_error = tostring(stream_error); |
153 log("debug", "Disconnecting client, <stream:error> is: %s", stream_error); | 149 log("debug", "Disconnecting client, <stream:error> is: %s", stream_error); |
154 session.send(stream_error); | 150 session.send(stream_error); |
155 end | 151 end |
156 | 152 |
157 session.send("</stream:stream>"); | 153 session.send("</stream:stream>"); |
158 function session.send() return false; end | 154 function session.send() return false; end |
159 | 155 |
160 local reason = (reason and (reason.name or reason.text or reason.condition)) or reason; | 156 local reason = (reason and (reason.name or reason.text or reason.condition)) or reason; |
161 session.log("info", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed"); | 157 session.log("debug", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed"); |
162 | 158 |
163 -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote | 159 -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote |
164 local conn = session.conn; | 160 local conn = session.conn; |
165 if reason == nil and not session.notopen and session.type == "c2s" then | 161 if reason == nil and not session.notopen and session.type == "c2s" then |
166 -- Grace time to process data from authenticated cleanly-closed stream | 162 -- Grace time to process data from authenticated cleanly-closed stream |
176 conn:close(); | 172 conn:close(); |
177 end | 173 end |
178 end | 174 end |
179 end | 175 end |
180 | 176 |
177 local function session_open_stream(session) | |
178 local attr = { | |
179 ["xmlns:stream"] = 'http://etherx.jabber.org/streams', | |
180 xmlns = stream_callbacks.default_ns, | |
181 version = "1.0", | |
182 ["xml:lang"] = 'en', | |
183 id = session.streamid or "", | |
184 from = session.host | |
185 }; | |
186 session.send("<?xml version='1.0'?>"); | |
187 session.send(st.stanza("stream:stream", attr):top_tag()); | |
188 end | |
189 | |
181 module:hook_global("user-deleted", function(event) | 190 module:hook_global("user-deleted", function(event) |
182 local username, host = event.username, event.host; | 191 local username, host = event.username, event.host; |
183 local user = hosts[host].sessions[username]; | 192 local user = hosts[host].sessions[username]; |
184 if user and user.sessions then | 193 if user and user.sessions then |
185 for jid, session in pairs(user.sessions) do | 194 for jid, session in pairs(user.sessions) do |
186 session:close{ condition = "not-authorized", text = "Account deleted" }; | 195 session:close{ condition = "not-authorized", text = "Account deleted" }; |
187 end | 196 end |
188 end | 197 end |
189 end, 200); | 198 end, 200); |
190 | 199 |
200 function runner_callbacks:ready() | |
201 self.data.conn:resume(); | |
202 end | |
203 | |
204 function runner_callbacks:waiting() | |
205 self.data.conn:pause(); | |
206 end | |
207 | |
208 function runner_callbacks:error(err) | |
209 (self.data.log or log)("error", "Traceback[c2s]: %s", err); | |
210 end | |
211 | |
191 --- Port listener | 212 --- Port listener |
192 function listener.onconnect(conn) | 213 function listener.onconnect(conn) |
193 local session = sm_new_session(conn); | 214 local session = sm_new_session(conn); |
194 sessions[conn] = session; | 215 sessions[conn] = session; |
195 | 216 |
196 session.log("info", "Client connected"); | 217 session.log("info", "Client connected"); |
197 | 218 |
198 -- Client is using legacy SSL (otherwise mod_tls sets this flag) | 219 -- Client is using legacy SSL (otherwise mod_tls sets this flag) |
199 if conn:ssl() then | 220 if conn:ssl() then |
200 session.secure = true; | 221 session.secure = true; |
222 session.encrypted = true; | |
201 | 223 |
202 -- Check if TLS compression is used | 224 -- Check if TLS compression is used |
203 local sock = conn:socket(); | 225 local sock = conn:socket(); |
204 if sock.info then | 226 if sock.info then |
205 session.compressed = sock:info"compression"; | 227 session.compressed = sock:info"compression"; |
206 elseif sock.compression then | 228 elseif sock.compression then |
207 session.compressed = sock:compression(); --COMPAT mw/luasec-hg | 229 session.compressed = sock:compression(); --COMPAT mw/luasec-hg |
208 end | 230 end |
209 end | 231 end |
210 | 232 |
211 if opt_keepalives then | 233 if opt_keepalives then |
212 conn:setoption("keepalive", opt_keepalives); | 234 conn:setoption("keepalive", opt_keepalives); |
213 end | 235 end |
214 | 236 |
237 session.open_stream = session_open_stream; | |
215 session.close = session_close; | 238 session.close = session_close; |
216 | 239 |
217 local stream = new_xmpp_stream(session, stream_callbacks); | 240 local stream = new_xmpp_stream(session, stream_callbacks); |
218 session.stream = stream; | 241 session.stream = stream; |
219 session.notopen = true; | 242 session.notopen = true; |
220 | 243 |
221 function session.reset_stream() | 244 function session.reset_stream() |
222 session.notopen = true; | 245 session.notopen = true; |
223 session.stream:reset(); | 246 session.stream:reset(); |
224 end | 247 end |
225 | 248 |
249 session.thread = runner(function (stanza) | |
250 core_process_stanza(session, stanza); | |
251 end, runner_callbacks, session); | |
252 | |
226 local filter = session.filter; | 253 local filter = session.filter; |
227 function session.data(data) | 254 function session.data(data) |
228 data = filter("bytes/in", data); | 255 -- Parse the data, which will store stanzas in session.pending_stanzas |
229 if data then | 256 if data then |
230 local ok, err = stream:feed(data); | 257 data = filter("bytes/in", data); |
231 if ok then return; end | 258 if data then |
232 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); | 259 local ok, err = stream:feed(data); |
233 session:close("not-well-formed"); | 260 if not ok then |
234 end | 261 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); |
235 end | 262 session:close("not-well-formed"); |
236 | 263 end |
237 | 264 end |
265 end | |
266 end | |
267 | |
238 if c2s_timeout then | 268 if c2s_timeout then |
239 add_task(c2s_timeout, function () | 269 add_task(c2s_timeout, function () |
240 if session.type == "c2s_unauthed" then | 270 if session.type == "c2s_unauthed" then |
241 session:close("connection-timeout"); | 271 session:close("connection-timeout"); |
242 end | 272 end |
260 sm_destroy_session(session, err); | 290 sm_destroy_session(session, err); |
261 sessions[conn] = nil; | 291 sessions[conn] = nil; |
262 end | 292 end |
263 end | 293 end |
264 | 294 |
295 function listener.onreadtimeout(conn) | |
296 local session = sessions[conn]; | |
297 if session then | |
298 return (hosts[session.host] or prosody).events.fire_event("c2s-read-timeout", { session = session }); | |
299 end | |
300 end | |
301 | |
302 local function keepalive(event) | |
303 return event.session.send(' '); | |
304 end | |
305 | |
265 function listener.associate_session(conn, session) | 306 function listener.associate_session(conn, session) |
266 sessions[conn] = session; | 307 sessions[conn] = session; |
267 end | 308 end |
309 | |
310 function module.add_host(module) | |
311 module:hook("c2s-read-timeout", keepalive, -1); | |
312 end | |
313 | |
314 module:hook("c2s-read-timeout", keepalive, -1); | |
268 | 315 |
269 module:hook("server-stopping", function(event) | 316 module:hook("server-stopping", function(event) |
270 local reason = event.reason; | 317 local reason = event.reason; |
271 for _, session in pairs(sessions) do | 318 for _, session in pairs(sessions) do |
272 session:close{ condition = "system-shutdown", text = reason }; | 319 session:close{ condition = "system-shutdown", text = reason }; |