Comparison

mod_websocket/mod_websocket.lua @ 895:1f4d77104da5

mod_websocket: Simplify by getting the c2s_listener from mod_c2s
author Florian Zeitz <florob@babelmonkeys.de>
date Sat, 12 Jan 2013 17:33:52 +0100
parent 857:1393af36ec9c
child 905:eae665bc2122
comparison
equal deleted inserted replaced
894:d066987e00b7 895:1f4d77104da5
1 -- Prosody IM 1 -- Prosody IM
2 -- Copyright (C) 2008-2010 Matthew Wild
3 -- Copyright (C) 2008-2010 Waqas Hussain
4 -- Copyright (C) 2012 Florian Zeitz 2 -- Copyright (C) 2012 Florian Zeitz
5 -- 3 --
6 -- This project is MIT/X11 licensed. Please see the 4 -- This project is MIT/X11 licensed. Please see the
7 -- COPYING file in the source package for more information. 5 -- COPYING file in the source package for more information.
8 -- 6 --
9 7
10 module:set_global(); 8 module:set_global();
11 9
12 local add_task = require "util.timer".add_task; 10 local add_filter = require "util.filters".add_filter;
13 local new_xmpp_stream = require "util.xmppstream".new;
14 local nameprep = require "util.encodings".stringprep.nameprep;
15 local sessionmanager = require "core.sessionmanager";
16 local st = require "util.stanza";
17 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session;
18 local uuid_generate = require "util.uuid".generate;
19 local sha1 = require "util.hashes".sha1; 11 local sha1 = require "util.hashes".sha1;
20 local base64 = require "util.encodings".base64.encode; 12 local base64 = require "util.encodings".base64.encode;
21 local band = require "bit".band; 13 local softreq = require "util.dependencies".softreq;
22 local bxor = require "bit".bxor; 14 local portmanager = require "core.portmanager";
23 15
24 local xpcall, tostring, type = xpcall, tostring, type; 16 local bit = softreq"bit" or softreq"bit32" or module:log("error", "No bit module found. Either LuaJIT 2 or Lua 5.2 is required");
25 local traceback = debug.traceback; 17 local band = bit.band;
26 18 local bxor = bit.bxor;
27 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
28
29 local log = module._log;
30
31 local c2s_timeout = module:get_option_number("c2s_timeout");
32 local stream_close_timeout = module:get_option_number("c2s_close_timeout", 5);
33 local opt_keepalives = module:get_option_boolean("tcp_keepalives", false);
34 19
35 local cross_domain = module:get_option("cross_domain_websocket"); 20 local cross_domain = module:get_option("cross_domain_websocket");
36 if cross_domain then 21 if cross_domain then
37 if cross_domain == true then 22 if cross_domain == true then
38 cross_domain = "*"; 23 cross_domain = "*";
42 if type(cross_domain) ~= "string" then 27 if type(cross_domain) ~= "string" then
43 cross_domain = nil; 28 cross_domain = nil;
44 end 29 end
45 end 30 end
46 31
47 local sessions = module:shared("sessions"); 32 module:depends("c2s")
48 local core_process_stanza = prosody.core_process_stanza; 33 local sessions = module:shared("c2s/sessions");
49 34 local c2s_listener = portmanager.get_service("c2s").listener;
50 local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza };
51 local listener = {};
52 35
53 -- Websocket helpers 36 -- Websocket helpers
54 local function parse_frame(frame) 37 local function parse_frame(frame)
55 local result = {}; 38 local result = {};
56 local pos = 1; 39 local pos = 1;
129 result = result .. data; 112 result = result .. data;
130 113
131 return result; 114 return result;
132 end 115 end
133 116
134 --- Stream events handlers 117 --- Filter stuff
135 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; 118 function handle_request(event, path)
136 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; 119 local request, response = event.request, event.response;
137 120 local conn = response.conn;
138 function stream_callbacks.streamopened(session, attr) 121
139 local send = session.send; 122 if not request.headers.sec_websocket_key then
140 session.host = nameprep(attr.to); 123 response.headers.content_type = "text/html";
141 if not session.host then 124 return [[<!DOCTYPE html><html><head><title>Websocket</title></head><body>
142 session:close{ condition = "improper-addressing", 125 <p>It works! Now point your WebSocket client to this URL to connect to Prosody.</p>
143 text = "A valid 'to' attribute is required on stream headers" }; 126 </body></html>]];
144 return; 127 end
145 end 128
146 session.version = tonumber(attr.version) or 0; 129 local wants_xmpp = false;
147 session.streamid = uuid_generate(); 130 (request.headers.sec_websocket_protocol or ""):gsub("([^,]*),?", function (proto)
148 (session.log or session)("debug", "Client sent opening <stream:stream> to %s", session.host); 131 if proto == "xmpp" then wants_xmpp = true; end
149 132 end);
150 if not hosts[session.host] then 133
151 -- We don't serve this host... 134 if not wants_xmpp then
152 session:close{ condition = "host-unknown", text = "This server does not serve "..tostring(session.host)}; 135 return 501;
153 return;
154 end
155
156 -- COMPAT: Some current client implementations need this to be self-closing
157 if session.self_closing_stream then
158 send("<?xml version='1.0'?>"..tostring(st.stanza("stream:stream", {
159 xmlns = 'jabber:client', ["xmlns:stream"] = 'http://etherx.jabber.org/streams';
160 id = session.streamid, from = session.host, version = '1.0', ["xml:lang"] = 'en' })));
161 else
162 send("<?xml version='1.0'?>"..st.stanza("stream:stream", {
163 xmlns = 'jabber:client', ["xmlns:stream"] = 'http://etherx.jabber.org/streams';
164 id = session.streamid, from = session.host, version = '1.0', ["xml:lang"] = 'en' }):top_tag());
165 end
166
167 (session.log or log)("debug", "Sent reply <stream:stream> to client");
168 session.notopen = nil;
169
170 -- If session.secure is *false* (not nil) then it means we /were/ encrypting
171 -- since we now have a new stream header, session is secured
172 if session.secure == false then
173 session.secure = true;
174 end
175
176 local features = st.stanza("stream:features");
177 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
178 module:fire_event("stream-features", session, features);
179
180 send(features);
181 end
182
183 function stream_callbacks.streamclosed(session)
184 session.log("debug", "Received </stream:stream>");
185 session:close(false);
186 end
187
188 function stream_callbacks.error(session, error, data)
189 if error == "no-stream" then
190 session.log("debug", "Invalid opening stream header");
191 session:close("invalid-namespace");
192 elseif error == "parse-error" then
193 (session.log or log)("debug", "Client XML parse error: %s", tostring(data));
194 session:close("not-well-formed");
195 elseif error == "stream-error" then
196 local condition, text = "undefined-condition";
197 for child in data:children() do
198 if child.attr.xmlns == xmlns_xmpp_streams then
199 if child.name ~= "text" then
200 condition = child.name;
201 else
202 text = child:get_text();
203 end
204 if condition ~= "undefined-condition" and text then
205 break;
206 end
207 end
208 end
209 text = condition .. (text and (" ("..text..")") or "");
210 session.log("info", "Session closed by remote with error: %s", text);
211 session:close(nil, text);
212 end
213 end
214
215 local function handleerr(err) log("error", "Traceback[c2s]: %s: %s", tostring(err), traceback()); end
216 function stream_callbacks.handlestanza(session, stanza)
217 stanza = session.filter("stanzas/in", stanza);
218 if stanza then
219 return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
220 end
221 end
222
223 --- Session methods
224 local function session_close(session, reason)
225 local log = session.log or log;
226 if session.conn then
227 if session.notopen then
228 -- COMPAT: Some current client implementations need this to be self-closing
229 if session.self_closing_stream then
230 session.send("<?xml version='1.0'?>"..tostring(st.stanza("stream:stream", default_stream_attr)));
231 else
232 session.send("<?xml version='1.0'?>"..st.stanza("stream:stream", default_stream_attr):top_tag());
233 end
234 end
235 if reason then -- nil == no err, initiated by us, false == initiated by client
236 if type(reason) == "string" then -- assume stream error
237 log("debug", "Disconnecting client, <stream:error> is: %s", reason);
238 session.send(st.stanza("stream:error"):tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }));
239 elseif type(reason) == "table" then
240 if reason.condition then
241 local stanza = st.stanza("stream:error"):tag(reason.condition, stream_xmlns_attr):up();
242 if reason.text then
243 stanza:tag("text", stream_xmlns_attr):text(reason.text):up();
244 end
245 if reason.extra then
246 stanza:add_child(reason.extra);
247 end
248 log("debug", "Disconnecting client, <stream:error> is: %s", tostring(stanza));
249 session.send(stanza);
250 elseif reason.name then -- a stanza
251 log("debug", "Disconnecting client, <stream:error> is: %s", tostring(reason));
252 session.send(reason);
253 end
254 end
255 end
256 session.send("</stream:stream>");
257 function session.send() return false; end
258
259 local reason = (reason and (reason.text or reason.condition)) or reason;
260 session.log("info", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed");
261
262 -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
263 local conn = session.conn;
264 if reason == nil and not session.notopen and session.type == "c2s" then
265 -- Grace time to process data from authenticated cleanly-closed stream
266 add_task(stream_close_timeout, function ()
267 if not session.destroyed then
268 session.log("warn", "Failed to receive a stream close response, closing connection anyway...");
269 sm_destroy_session(session, reason);
270 conn:close();
271 end
272 end);
273 else
274 sm_destroy_session(session, reason);
275 conn:close();
276 end
277 end
278 end
279
280 module:hook_global("user-deleted", function(event)
281 local username, host = event.username, event.host;
282 local user = hosts[host].sessions[username];
283 if user and user.sessions then
284 for jid, session in pairs(user.sessions) do
285 session:close{ condition = "not-authorized", text = "Account deleted" };
286 end
287 end
288 end, 200);
289
290 --- Port listener
291 function listener.onconnect(conn)
292 local session = sm_new_session(conn);
293 sessions[conn] = session;
294
295 session.log("info", "Client connected");
296
297 -- Client is using legacy SSL (otherwise mod_tls sets this flag)
298 if conn:ssl() then
299 session.secure = true;
300 end
301
302 if opt_keepalives then
303 conn:setoption("keepalive", opt_keepalives);
304 end
305
306 session.close = session_close;
307
308 session.conn.starttls = nil;
309
310 local stream = new_xmpp_stream(session, stream_callbacks);
311 session.stream = stream;
312 session.notopen = true;
313
314 function session.reset_stream()
315 session.notopen = true;
316 session.stream:reset();
317 end 136 end
318 137
319 local function websocket_close(code, message) 138 local function websocket_close(code, message)
320 local data = string.char(code/0x100) .. string.char(code%0x100) .. message; 139 local data = string.char(code/0x100) .. string.char(code%0x100) .. message;
321 conn:write(build_frame({opcode = 0x8, FIN = true, data = data})); 140 conn:write(build_frame({opcode = 0x8, FIN = true, data = data}));
322 conn:close(); 141 conn:close();
323 end 142 end
324 143
325 local filter = session.filter;
326 local dataBuffer; 144 local dataBuffer;
327 local function handle_frame(frame) 145 local function handle_frame(frame)
328 module:log("debug", "Websocket received: %s (%i bytes)", frame.data, #frame.data); 146 module:log("debug", "Websocket received: %s (%i bytes)", frame.data, #frame.data);
329 147
330 -- Error cases 148 -- Error cases
363 dataBuffer = dataBuffer .. frame.data; 181 dataBuffer = dataBuffer .. frame.data;
364 elseif frame.opcode == 0x1 then -- Text frame 182 elseif frame.opcode == 0x1 then -- Text frame
365 dataBuffer = frame.data; 183 dataBuffer = frame.data;
366 elseif frame.opcode == 0x2 then -- Binary frame 184 elseif frame.opcode == 0x2 then -- Binary frame
367 websocket_close(1003, "Only text frames are supported"); 185 websocket_close(1003, "Only text frames are supported");
368 return false; 186 return;
369 elseif frame.opcode == 0x8 then -- Close request 187 elseif frame.opcode == 0x8 then -- Close request
370 websocket_close(1000, "Goodbye"); 188 websocket_close(1000, "Goodbye");
371 return false; 189 return;
372 elseif frame.opcode == 0x9 then -- Ping frame 190 elseif frame.opcode == 0x9 then -- Ping frame
373 frame.opcode = 0xA; 191 frame.opcode = 0xA;
374 conn:write(build_frame(frame)); 192 conn:write(build_frame(frame));
375 return true; 193 return "";
376 else 194 else
377 log("warn", "Received frame with unsupported opcode %i", frame.opcode); 195 log("warn", "Received frame with unsupported opcode %i", frame.opcode);
378 return true; 196 return "";
379 end 197 end
380 198
381 if frame.FIN then 199 if frame.FIN then
382 data = dataBuffer; 200 data = dataBuffer;
383 dataBuffer = nil; 201 dataBuffer = nil;
384 202
385 -- COMPAT: Some current client implementations send a self-closing <stream:stream> 203 return data;
386 data, session.self_closing_stream = data:gsub("^(<stream:stream.*)/>$", "%1>"); 204 end
387 session.self_closing_stream = (session.self_closing_stream == 1) 205 return "";
388 206 end
389 data = filter("bytes/in", data); 207
390 if data then 208 conn:setlistener(c2s_listener);
391 local ok, err = stream:feed(data); 209 c2s_listener.onconnect(conn);
392 if ok then return; end
393 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
394 session:close("not-well-formed");
395 end
396 end
397 return true;
398 end
399 210
400 local frameBuffer = ""; 211 local frameBuffer = "";
401 function session.data(data) 212 add_filter(sessions[conn], "bytes/in", function(data)
213 local cache = "";
402 frameBuffer = frameBuffer .. data; 214 frameBuffer = frameBuffer .. data;
403 local frame, length = parse_frame(frameBuffer); 215 local frame, length = parse_frame(frameBuffer);
404 216
405 while frame do 217 while frame do
406 frameBuffer = frameBuffer:sub(length + 1); 218 frameBuffer = frameBuffer:sub(length + 1);
407 if not handle_frame(frame) then return; end 219 local result = handle_frame(frame);
220 if not result then return; end
221 cache = cache .. result;
408 frame, length = parse_frame(frameBuffer); 222 frame, length = parse_frame(frameBuffer);
409 end 223 end
410 end 224 return cache;
411 225
412 function session.send(s)
413 conn:write(build_frame({ FIN = true, opcode = 0x01, data = tostring(s)}));
414 end
415
416 if c2s_timeout then
417 add_task(c2s_timeout, function ()
418 if session.type == "c2s_unauthed" then
419 session:close("connection-timeout");
420 end
421 end);
422 end
423
424 session.dispatch_stanza = stream_callbacks.handlestanza;
425 end
426
427 function listener.onincoming(conn, data)
428 local session = sessions[conn];
429 if session then
430 session.data(data);
431 else
432 listener.onconnect(conn, data);
433 session = sessions[conn];
434 session.data(data);
435 end
436 end
437
438 function listener.ondisconnect(conn, err)
439 local session = sessions[conn];
440 if session then
441 (session.log or log)("info", "Client disconnected: %s", err or "connection closed");
442 sm_destroy_session(session, err);
443 sessions[conn] = nil;
444 end
445 end
446
447 function listener.associate_session(conn, session)
448 sessions[conn] = session;
449 end
450
451 function handle_request(event, path)
452 local request, response = event.request, event.response;
453
454 if not request.headers.sec_websocket_key then
455 response.headers.content_type = "text/html";
456 return [[<!DOCTYPE html><html><head><title>Websocket</title></head><body>
457 <p>It works! Now point your WebSocket client to this URL to connect to Prosody.</p>
458 </body></html>]];
459 end
460
461 local wants_xmpp = false;
462 (request.headers.sec_websocket_protocol or ""):gsub("([^,]*),?", function (proto)
463 if proto == "xmpp" then wants_xmpp = true; end
464 end); 226 end);
465 227
466 if not wants_xmpp then 228 add_filter(sessions[conn], "bytes/out", function(data)
467 return 501; 229 return build_frame({ FIN = true, opcode = 0x01, data = tostring(data)});
468 end 230 end);
469 231
470 response.conn:setlistener(listener);
471 response.status = "101 Switching Protocols"; 232 response.status = "101 Switching Protocols";
472 response.headers.Upgrade = "websocket"; 233 response.headers.upgrade = "websocket";
473 response.headers.Connection = "Upgrade"; 234 response.headers.connection = "Upgrade";
474 response.headers.Sec_WebSocket_Accept = base64(sha1(request.headers.sec_websocket_key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); 235 response.headers.sec_webSocket_accept = base64(sha1(request.headers.sec_websocket_key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"));
475 response.headers.Sec_WebSocket_Protocol = "xmpp"; 236 response.headers.sec_webSocket_protocol = "xmpp";
476 response.headers.Access_Control_Allow_Origin = cross_domain; 237 response.headers.access_control_allow_origin = cross_domain;
477 238
478 return ""; 239 return "";
479 end 240 end
480 241
481 function module.add_host(module) 242 function module.add_host(module)