Software /
code /
prosody-modules
Comparison
mod_websocket/mod_websocket.lua @ 895:1f4d77104da5
mod_websocket: Simplify by getting the c2s_listener from mod_c2s
author | Florian Zeitz <florob@babelmonkeys.de> |
---|---|
date | Sat, 12 Jan 2013 17:33:52 +0100 |
parent | 857:1393af36ec9c |
child | 905:eae665bc2122 |
comparison
equal
deleted
inserted
replaced
894:d066987e00b7 | 895:1f4d77104da5 |
---|---|
1 -- Prosody IM | 1 -- Prosody IM |
2 -- Copyright (C) 2008-2010 Matthew Wild | |
3 -- Copyright (C) 2008-2010 Waqas Hussain | |
4 -- Copyright (C) 2012 Florian Zeitz | 2 -- Copyright (C) 2012 Florian Zeitz |
5 -- | 3 -- |
6 -- This project is MIT/X11 licensed. Please see the | 4 -- This project is MIT/X11 licensed. Please see the |
7 -- COPYING file in the source package for more information. | 5 -- COPYING file in the source package for more information. |
8 -- | 6 -- |
9 | 7 |
10 module:set_global(); | 8 module:set_global(); |
11 | 9 |
12 local add_task = require "util.timer".add_task; | 10 local add_filter = require "util.filters".add_filter; |
13 local new_xmpp_stream = require "util.xmppstream".new; | |
14 local nameprep = require "util.encodings".stringprep.nameprep; | |
15 local sessionmanager = require "core.sessionmanager"; | |
16 local st = require "util.stanza"; | |
17 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; | |
18 local uuid_generate = require "util.uuid".generate; | |
19 local sha1 = require "util.hashes".sha1; | 11 local sha1 = require "util.hashes".sha1; |
20 local base64 = require "util.encodings".base64.encode; | 12 local base64 = require "util.encodings".base64.encode; |
21 local band = require "bit".band; | 13 local softreq = require "util.dependencies".softreq; |
22 local bxor = require "bit".bxor; | 14 local portmanager = require "core.portmanager"; |
23 | 15 |
24 local xpcall, tostring, type = xpcall, tostring, type; | 16 local bit = softreq"bit" or softreq"bit32" or module:log("error", "No bit module found. Either LuaJIT 2 or Lua 5.2 is required"); |
25 local traceback = debug.traceback; | 17 local band = bit.band; |
26 | 18 local bxor = bit.bxor; |
27 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; | |
28 | |
29 local log = module._log; | |
30 | |
31 local c2s_timeout = module:get_option_number("c2s_timeout"); | |
32 local stream_close_timeout = module:get_option_number("c2s_close_timeout", 5); | |
33 local opt_keepalives = module:get_option_boolean("tcp_keepalives", false); | |
34 | 19 |
35 local cross_domain = module:get_option("cross_domain_websocket"); | 20 local cross_domain = module:get_option("cross_domain_websocket"); |
36 if cross_domain then | 21 if cross_domain then |
37 if cross_domain == true then | 22 if cross_domain == true then |
38 cross_domain = "*"; | 23 cross_domain = "*"; |
42 if type(cross_domain) ~= "string" then | 27 if type(cross_domain) ~= "string" then |
43 cross_domain = nil; | 28 cross_domain = nil; |
44 end | 29 end |
45 end | 30 end |
46 | 31 |
47 local sessions = module:shared("sessions"); | 32 module:depends("c2s") |
48 local core_process_stanza = prosody.core_process_stanza; | 33 local sessions = module:shared("c2s/sessions"); |
49 | 34 local c2s_listener = portmanager.get_service("c2s").listener; |
50 local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza }; | |
51 local listener = {}; | |
52 | 35 |
53 -- Websocket helpers | 36 -- Websocket helpers |
54 local function parse_frame(frame) | 37 local function parse_frame(frame) |
55 local result = {}; | 38 local result = {}; |
56 local pos = 1; | 39 local pos = 1; |
129 result = result .. data; | 112 result = result .. data; |
130 | 113 |
131 return result; | 114 return result; |
132 end | 115 end |
133 | 116 |
134 --- Stream events handlers | 117 --- Filter stuff |
135 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; | 118 function handle_request(event, path) |
136 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; | 119 local request, response = event.request, event.response; |
137 | 120 local conn = response.conn; |
138 function stream_callbacks.streamopened(session, attr) | 121 |
139 local send = session.send; | 122 if not request.headers.sec_websocket_key then |
140 session.host = nameprep(attr.to); | 123 response.headers.content_type = "text/html"; |
141 if not session.host then | 124 return [[<!DOCTYPE html><html><head><title>Websocket</title></head><body> |
142 session:close{ condition = "improper-addressing", | 125 <p>It works! Now point your WebSocket client to this URL to connect to Prosody.</p> |
143 text = "A valid 'to' attribute is required on stream headers" }; | 126 </body></html>]]; |
144 return; | 127 end |
145 end | 128 |
146 session.version = tonumber(attr.version) or 0; | 129 local wants_xmpp = false; |
147 session.streamid = uuid_generate(); | 130 (request.headers.sec_websocket_protocol or ""):gsub("([^,]*),?", function (proto) |
148 (session.log or session)("debug", "Client sent opening <stream:stream> to %s", session.host); | 131 if proto == "xmpp" then wants_xmpp = true; end |
149 | 132 end); |
150 if not hosts[session.host] then | 133 |
151 -- We don't serve this host... | 134 if not wants_xmpp then |
152 session:close{ condition = "host-unknown", text = "This server does not serve "..tostring(session.host)}; | 135 return 501; |
153 return; | |
154 end | |
155 | |
156 -- COMPAT: Some current client implementations need this to be self-closing | |
157 if session.self_closing_stream then | |
158 send("<?xml version='1.0'?>"..tostring(st.stanza("stream:stream", { | |
159 xmlns = 'jabber:client', ["xmlns:stream"] = 'http://etherx.jabber.org/streams'; | |
160 id = session.streamid, from = session.host, version = '1.0', ["xml:lang"] = 'en' }))); | |
161 else | |
162 send("<?xml version='1.0'?>"..st.stanza("stream:stream", { | |
163 xmlns = 'jabber:client', ["xmlns:stream"] = 'http://etherx.jabber.org/streams'; | |
164 id = session.streamid, from = session.host, version = '1.0', ["xml:lang"] = 'en' }):top_tag()); | |
165 end | |
166 | |
167 (session.log or log)("debug", "Sent reply <stream:stream> to client"); | |
168 session.notopen = nil; | |
169 | |
170 -- If session.secure is *false* (not nil) then it means we /were/ encrypting | |
171 -- since we now have a new stream header, session is secured | |
172 if session.secure == false then | |
173 session.secure = true; | |
174 end | |
175 | |
176 local features = st.stanza("stream:features"); | |
177 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); | |
178 module:fire_event("stream-features", session, features); | |
179 | |
180 send(features); | |
181 end | |
182 | |
183 function stream_callbacks.streamclosed(session) | |
184 session.log("debug", "Received </stream:stream>"); | |
185 session:close(false); | |
186 end | |
187 | |
188 function stream_callbacks.error(session, error, data) | |
189 if error == "no-stream" then | |
190 session.log("debug", "Invalid opening stream header"); | |
191 session:close("invalid-namespace"); | |
192 elseif error == "parse-error" then | |
193 (session.log or log)("debug", "Client XML parse error: %s", tostring(data)); | |
194 session:close("not-well-formed"); | |
195 elseif error == "stream-error" then | |
196 local condition, text = "undefined-condition"; | |
197 for child in data:children() do | |
198 if child.attr.xmlns == xmlns_xmpp_streams then | |
199 if child.name ~= "text" then | |
200 condition = child.name; | |
201 else | |
202 text = child:get_text(); | |
203 end | |
204 if condition ~= "undefined-condition" and text then | |
205 break; | |
206 end | |
207 end | |
208 end | |
209 text = condition .. (text and (" ("..text..")") or ""); | |
210 session.log("info", "Session closed by remote with error: %s", text); | |
211 session:close(nil, text); | |
212 end | |
213 end | |
214 | |
215 local function handleerr(err) log("error", "Traceback[c2s]: %s: %s", tostring(err), traceback()); end | |
216 function stream_callbacks.handlestanza(session, stanza) | |
217 stanza = session.filter("stanzas/in", stanza); | |
218 if stanza then | |
219 return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); | |
220 end | |
221 end | |
222 | |
223 --- Session methods | |
224 local function session_close(session, reason) | |
225 local log = session.log or log; | |
226 if session.conn then | |
227 if session.notopen then | |
228 -- COMPAT: Some current client implementations need this to be self-closing | |
229 if session.self_closing_stream then | |
230 session.send("<?xml version='1.0'?>"..tostring(st.stanza("stream:stream", default_stream_attr))); | |
231 else | |
232 session.send("<?xml version='1.0'?>"..st.stanza("stream:stream", default_stream_attr):top_tag()); | |
233 end | |
234 end | |
235 if reason then -- nil == no err, initiated by us, false == initiated by client | |
236 if type(reason) == "string" then -- assume stream error | |
237 log("debug", "Disconnecting client, <stream:error> is: %s", reason); | |
238 session.send(st.stanza("stream:error"):tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' })); | |
239 elseif type(reason) == "table" then | |
240 if reason.condition then | |
241 local stanza = st.stanza("stream:error"):tag(reason.condition, stream_xmlns_attr):up(); | |
242 if reason.text then | |
243 stanza:tag("text", stream_xmlns_attr):text(reason.text):up(); | |
244 end | |
245 if reason.extra then | |
246 stanza:add_child(reason.extra); | |
247 end | |
248 log("debug", "Disconnecting client, <stream:error> is: %s", tostring(stanza)); | |
249 session.send(stanza); | |
250 elseif reason.name then -- a stanza | |
251 log("debug", "Disconnecting client, <stream:error> is: %s", tostring(reason)); | |
252 session.send(reason); | |
253 end | |
254 end | |
255 end | |
256 session.send("</stream:stream>"); | |
257 function session.send() return false; end | |
258 | |
259 local reason = (reason and (reason.text or reason.condition)) or reason; | |
260 session.log("info", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed"); | |
261 | |
262 -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote | |
263 local conn = session.conn; | |
264 if reason == nil and not session.notopen and session.type == "c2s" then | |
265 -- Grace time to process data from authenticated cleanly-closed stream | |
266 add_task(stream_close_timeout, function () | |
267 if not session.destroyed then | |
268 session.log("warn", "Failed to receive a stream close response, closing connection anyway..."); | |
269 sm_destroy_session(session, reason); | |
270 conn:close(); | |
271 end | |
272 end); | |
273 else | |
274 sm_destroy_session(session, reason); | |
275 conn:close(); | |
276 end | |
277 end | |
278 end | |
279 | |
280 module:hook_global("user-deleted", function(event) | |
281 local username, host = event.username, event.host; | |
282 local user = hosts[host].sessions[username]; | |
283 if user and user.sessions then | |
284 for jid, session in pairs(user.sessions) do | |
285 session:close{ condition = "not-authorized", text = "Account deleted" }; | |
286 end | |
287 end | |
288 end, 200); | |
289 | |
290 --- Port listener | |
291 function listener.onconnect(conn) | |
292 local session = sm_new_session(conn); | |
293 sessions[conn] = session; | |
294 | |
295 session.log("info", "Client connected"); | |
296 | |
297 -- Client is using legacy SSL (otherwise mod_tls sets this flag) | |
298 if conn:ssl() then | |
299 session.secure = true; | |
300 end | |
301 | |
302 if opt_keepalives then | |
303 conn:setoption("keepalive", opt_keepalives); | |
304 end | |
305 | |
306 session.close = session_close; | |
307 | |
308 session.conn.starttls = nil; | |
309 | |
310 local stream = new_xmpp_stream(session, stream_callbacks); | |
311 session.stream = stream; | |
312 session.notopen = true; | |
313 | |
314 function session.reset_stream() | |
315 session.notopen = true; | |
316 session.stream:reset(); | |
317 end | 136 end |
318 | 137 |
319 local function websocket_close(code, message) | 138 local function websocket_close(code, message) |
320 local data = string.char(code/0x100) .. string.char(code%0x100) .. message; | 139 local data = string.char(code/0x100) .. string.char(code%0x100) .. message; |
321 conn:write(build_frame({opcode = 0x8, FIN = true, data = data})); | 140 conn:write(build_frame({opcode = 0x8, FIN = true, data = data})); |
322 conn:close(); | 141 conn:close(); |
323 end | 142 end |
324 | 143 |
325 local filter = session.filter; | |
326 local dataBuffer; | 144 local dataBuffer; |
327 local function handle_frame(frame) | 145 local function handle_frame(frame) |
328 module:log("debug", "Websocket received: %s (%i bytes)", frame.data, #frame.data); | 146 module:log("debug", "Websocket received: %s (%i bytes)", frame.data, #frame.data); |
329 | 147 |
330 -- Error cases | 148 -- Error cases |
363 dataBuffer = dataBuffer .. frame.data; | 181 dataBuffer = dataBuffer .. frame.data; |
364 elseif frame.opcode == 0x1 then -- Text frame | 182 elseif frame.opcode == 0x1 then -- Text frame |
365 dataBuffer = frame.data; | 183 dataBuffer = frame.data; |
366 elseif frame.opcode == 0x2 then -- Binary frame | 184 elseif frame.opcode == 0x2 then -- Binary frame |
367 websocket_close(1003, "Only text frames are supported"); | 185 websocket_close(1003, "Only text frames are supported"); |
368 return false; | 186 return; |
369 elseif frame.opcode == 0x8 then -- Close request | 187 elseif frame.opcode == 0x8 then -- Close request |
370 websocket_close(1000, "Goodbye"); | 188 websocket_close(1000, "Goodbye"); |
371 return false; | 189 return; |
372 elseif frame.opcode == 0x9 then -- Ping frame | 190 elseif frame.opcode == 0x9 then -- Ping frame |
373 frame.opcode = 0xA; | 191 frame.opcode = 0xA; |
374 conn:write(build_frame(frame)); | 192 conn:write(build_frame(frame)); |
375 return true; | 193 return ""; |
376 else | 194 else |
377 log("warn", "Received frame with unsupported opcode %i", frame.opcode); | 195 log("warn", "Received frame with unsupported opcode %i", frame.opcode); |
378 return true; | 196 return ""; |
379 end | 197 end |
380 | 198 |
381 if frame.FIN then | 199 if frame.FIN then |
382 data = dataBuffer; | 200 data = dataBuffer; |
383 dataBuffer = nil; | 201 dataBuffer = nil; |
384 | 202 |
385 -- COMPAT: Some current client implementations send a self-closing <stream:stream> | 203 return data; |
386 data, session.self_closing_stream = data:gsub("^(<stream:stream.*)/>$", "%1>"); | 204 end |
387 session.self_closing_stream = (session.self_closing_stream == 1) | 205 return ""; |
388 | 206 end |
389 data = filter("bytes/in", data); | 207 |
390 if data then | 208 conn:setlistener(c2s_listener); |
391 local ok, err = stream:feed(data); | 209 c2s_listener.onconnect(conn); |
392 if ok then return; end | |
393 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); | |
394 session:close("not-well-formed"); | |
395 end | |
396 end | |
397 return true; | |
398 end | |
399 | 210 |
400 local frameBuffer = ""; | 211 local frameBuffer = ""; |
401 function session.data(data) | 212 add_filter(sessions[conn], "bytes/in", function(data) |
213 local cache = ""; | |
402 frameBuffer = frameBuffer .. data; | 214 frameBuffer = frameBuffer .. data; |
403 local frame, length = parse_frame(frameBuffer); | 215 local frame, length = parse_frame(frameBuffer); |
404 | 216 |
405 while frame do | 217 while frame do |
406 frameBuffer = frameBuffer:sub(length + 1); | 218 frameBuffer = frameBuffer:sub(length + 1); |
407 if not handle_frame(frame) then return; end | 219 local result = handle_frame(frame); |
220 if not result then return; end | |
221 cache = cache .. result; | |
408 frame, length = parse_frame(frameBuffer); | 222 frame, length = parse_frame(frameBuffer); |
409 end | 223 end |
410 end | 224 return cache; |
411 | 225 |
412 function session.send(s) | |
413 conn:write(build_frame({ FIN = true, opcode = 0x01, data = tostring(s)})); | |
414 end | |
415 | |
416 if c2s_timeout then | |
417 add_task(c2s_timeout, function () | |
418 if session.type == "c2s_unauthed" then | |
419 session:close("connection-timeout"); | |
420 end | |
421 end); | |
422 end | |
423 | |
424 session.dispatch_stanza = stream_callbacks.handlestanza; | |
425 end | |
426 | |
427 function listener.onincoming(conn, data) | |
428 local session = sessions[conn]; | |
429 if session then | |
430 session.data(data); | |
431 else | |
432 listener.onconnect(conn, data); | |
433 session = sessions[conn]; | |
434 session.data(data); | |
435 end | |
436 end | |
437 | |
438 function listener.ondisconnect(conn, err) | |
439 local session = sessions[conn]; | |
440 if session then | |
441 (session.log or log)("info", "Client disconnected: %s", err or "connection closed"); | |
442 sm_destroy_session(session, err); | |
443 sessions[conn] = nil; | |
444 end | |
445 end | |
446 | |
447 function listener.associate_session(conn, session) | |
448 sessions[conn] = session; | |
449 end | |
450 | |
451 function handle_request(event, path) | |
452 local request, response = event.request, event.response; | |
453 | |
454 if not request.headers.sec_websocket_key then | |
455 response.headers.content_type = "text/html"; | |
456 return [[<!DOCTYPE html><html><head><title>Websocket</title></head><body> | |
457 <p>It works! Now point your WebSocket client to this URL to connect to Prosody.</p> | |
458 </body></html>]]; | |
459 end | |
460 | |
461 local wants_xmpp = false; | |
462 (request.headers.sec_websocket_protocol or ""):gsub("([^,]*),?", function (proto) | |
463 if proto == "xmpp" then wants_xmpp = true; end | |
464 end); | 226 end); |
465 | 227 |
466 if not wants_xmpp then | 228 add_filter(sessions[conn], "bytes/out", function(data) |
467 return 501; | 229 return build_frame({ FIN = true, opcode = 0x01, data = tostring(data)}); |
468 end | 230 end); |
469 | 231 |
470 response.conn:setlistener(listener); | |
471 response.status = "101 Switching Protocols"; | 232 response.status = "101 Switching Protocols"; |
472 response.headers.Upgrade = "websocket"; | 233 response.headers.upgrade = "websocket"; |
473 response.headers.Connection = "Upgrade"; | 234 response.headers.connection = "Upgrade"; |
474 response.headers.Sec_WebSocket_Accept = base64(sha1(request.headers.sec_websocket_key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); | 235 response.headers.sec_webSocket_accept = base64(sha1(request.headers.sec_websocket_key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); |
475 response.headers.Sec_WebSocket_Protocol = "xmpp"; | 236 response.headers.sec_webSocket_protocol = "xmpp"; |
476 response.headers.Access_Control_Allow_Origin = cross_domain; | 237 response.headers.access_control_allow_origin = cross_domain; |
477 | 238 |
478 return ""; | 239 return ""; |
479 end | 240 end |
480 | 241 |
481 function module.add_host(module) | 242 function module.add_host(module) |