Comparison

mod_websocket/mod_websocket.lua @ 676:54fa9d6d7809

mod_websocket: New mod_c2s based version, still WIP
author Florian Zeitz <florob@babelmonkeys.de>
date Fri, 25 May 2012 17:20:41 +0200
parent 129:5fc00a3e47b5
child 677:eeb41cd5e9f3
comparison
equal deleted inserted replaced
675:da33325453fb 676:54fa9d6d7809
1 module.host = "*" -- Global module 1 -- Prosody IM
2 2 -- Copyright (C) 2008-2010 Matthew Wild
3 local logger = require "util.logger"; 3 -- Copyright (C) 2008-2010 Waqas Hussain
4 local log = logger.init("mod_websocket"); 4 -- Copyright (C) 2012 Florian Zeitz
5 local httpserver = require "net.httpserver"; 5 --
6 local lxp = require "lxp"; 6 -- This project is MIT/X11 licensed. Please see the
7 local init_xmlhandlers = require "core.xmlhandlers"; 7 -- COPYING file in the source package for more information.
8 --
9
10 module:set_global();
11
12 local add_task = require "util.timer".add_task;
13 local new_xmpp_stream = require "util.xmppstream".new;
14 local nameprep = require "util.encodings".stringprep.nameprep;
15 local sessionmanager = require "core.sessionmanager";
8 local st = require "util.stanza"; 16 local st = require "util.stanza";
9 local sm = require "core.sessionmanager"; 17 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session;
10 18 local uuid_generate = require "util.uuid".generate;
11 local sessions = {}; 19 local sha1 = require "util.hashes".sha1;
12 local default_headers = { }; 20 local base64 = require "util.encodings".base64.encode;
13 21 local bxor = require "bit".bxor;
14 22 local tohex = require "bit".tohex;
15 local stream_callbacks = { default_ns = "jabber:client", 23
16 streamopened = sm.streamopened, 24 module:depends("http")
17 streamclosed = sm.streamclosed, 25
18 handlestanza = core_process_stanza }; 26
27 local xpcall, tostring, type = xpcall, tostring, type;
28 local traceback = debug.traceback;
29
30 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
31
32 local log = module._log;
33
34 local c2s_timeout = module:get_option_number("c2s_timeout");
35 local opt_keepalives = module:get_option_boolean("tcp_keepalives", false);
36
37 local sessions = module:shared("sessions");
38
39 local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza };
40 local listener = {};
41
42 --- Stream events handlers
43 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
44 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" };
45
46 function stream_callbacks.streamopened(session, attr)
47 local send = session.send;
48 session.host = nameprep(attr.to);
49 if not session.host then
50 session:close{ condition = "improper-addressing",
51 text = "A valid 'to' attribute is required on stream headers" };
52 return;
53 end
54 session.version = tonumber(attr.version) or 0;
55 session.streamid = uuid_generate();
56 (session.log or session)("debug", "Client sent opening <stream:stream> to %s", session.host);
57
58 if not hosts[session.host] then
59 -- We don't serve this host...
60 session:close{ condition = "host-unknown", text = "This server does not serve "..tostring(session.host)};
61 return;
62 end
63
64 send("<?xml version='1.0'?>"..(tostring(st.stanza("stream:stream", {
65 xmlns = 'jabber:client', ["xmlns:stream"] = 'http://etherx.jabber.org/streams';
66 id = session.streamid, from = session.host, version = '1.0', ["xml:lang"] = 'en' }):top_tag()):gsub(">", "/>")));
67
68 (session.log or log)("debug", "Sent reply <stream:stream> to client");
69 session.notopen = nil;
70
71 -- If session.secure is *false* (not nil) then it means we /were/ encrypting
72 -- since we now have a new stream header, session is secured
73 if session.secure == false then
74 session.secure = true;
75 end
76
77 local features = st.stanza("stream:features");
78 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
79 module:fire_event("stream-features", session, features);
80
81 send(features);
82 end
83
84 function stream_callbacks.streamclosed(session)
85 session.log("debug", "Received </stream:stream>");
86 session:close();
87 end
88
19 function stream_callbacks.error(session, error, data) 89 function stream_callbacks.error(session, error, data)
20 if error == "no-stream" then 90 if error == "no-stream" then
21 session.log("debug", "Invalid opening stream header"); 91 session.log("debug", "Invalid opening stream header");
22 session:close("invalid-namespace"); 92 session:close("invalid-namespace");
23 elseif session.close then 93 elseif error == "parse-error" then
24 (session.log or log)("debug", "Client XML parse error: %s", tostring(error)); 94 (session.log or log)("debug", "Client XML parse error: %s", tostring(data));
25 session:close("xml-not-well-formed"); 95 session:close("not-well-formed");
26 end 96 elseif error == "stream-error" then
27 end 97 local condition, text = "undefined-condition";
28 98 for child in data:children() do
29 99 if child.attr.xmlns == xmlns_xmpp_streams then
30 local function session_reset_stream(session) 100 if child.name ~= "text" then
31 local parser = lxp.new(init_xmlhandlers(session, stream_callbacks), "\1"); 101 condition = child.name;
32 session.parser = parser; 102 else
33 103 text = child:get_text();
34 session.notopen = true; 104 end
35 105 if condition ~= "undefined-condition" and text then
36 function session.data(conn, data) 106 break;
37 data, _ = data:gsub("[%z\255]", "") 107 end
38 log("debug", "Parsing: %s", data) 108 end
39 109 end
40 local ok, err = parser:parse(data) 110 text = condition .. (text and (" ("..text..")") or "");
41 if not ok then 111 session.log("info", "Session closed by remote with error: %s", text);
42 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, 112 session:close(nil, text);
43 data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); 113 end
44 session:close("xml-not-well-formed"); 114 end
45 end 115
46 end 116 local function handleerr(err) log("error", "Traceback[c2s]: %s: %s", tostring(err), traceback()); end
47 end 117 function stream_callbacks.handlestanza(session, stanza)
48 118 stanza = session.filter("stanzas/in", stanza);
49 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; 119 if stanza then
50 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; 120 return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
121 end
122 end
123
124 --- Session methods
51 local function session_close(session, reason) 125 local function session_close(session, reason)
52 local log = session.log or log; 126 local log = session.log or log;
53 if session.conn then 127 if session.conn then
54 if session.notopen then 128 if session.notopen then
55 session.send("<?xml version='1.0'?>"); 129 session.send("<?xml version='1.0'?>");
76 end 150 end
77 end 151 end
78 end 152 end
79 session.send("</stream:stream>"); 153 session.send("</stream:stream>");
80 session.conn:close(); 154 session.conn:close();
81 websocket_listener.ondisconnect(session.conn, (reason and (reason.text or reason.condition)) or reason or "session closed"); 155 listener.ondisconnect(session.conn, (reason and (reason.text or reason.condition)) or reason or "session closed");
82 end 156 end
83 end 157 end
84 158
85 159 --- Port listener
86 local websocket_listener = { default_mode = "*a" }; 160 function listener.onconnect(conn)
87 function websocket_listener.onincoming(conn, data) 161 local session = sm_new_session(conn);
162 sessions[conn] = session;
163
164 session.log("info", "Client connected");
165
166 -- Client is using legacy SSL (otherwise mod_tls sets this flag)
167 if conn:ssl() then
168 session.secure = true;
169 end
170
171 if opt_keepalives then
172 conn:setoption("keepalive", opt_keepalives);
173 end
174
175 session.close = session_close;
176
177 local stream = new_xmpp_stream(session, stream_callbacks);
178 session.stream = stream;
179 session.notopen = true;
180
181 function session.reset_stream()
182 session.notopen = true;
183 session.stream:reset();
184 end
185
186 local filter = session.filter;
187 function session.data(data)
188 local off = 0;
189 local len = string.byte(data, 2) - 0x80;
190 if len == 126 then
191 off = 2;
192 elseif len ==127 then
193 off = 8;
194 end
195 local key = {string.byte(data, off+3), string.byte(data, off+4), string.byte(data, off+5), string.byte(data, off+6)}
196 local decoded = "";
197 local counter = 0;
198 for i = off+7, #data do
199 decoded = decoded .. string.char(bxor(key[counter+1], string.byte(data, i)));
200 counter = (counter + 1) % 4;
201 end
202 module:log("debug", "Websocket received: %s %i", decoded, #decoded)
203 decoded = decoded:gsub("/>$", ">");
204
205 data = filter("bytes/in", decoded);
206 if data then
207 local ok, err = stream:feed(data);
208 if ok then return; end
209 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
210 session:close("not-well-formed");
211 end
212 end
213
214 function session.send(s)
215 s = tostring(s);
216 local len = #s;
217 if len < 126 then
218 conn:write("\x81" .. string.char(len) .. s);
219 elseif len <= 0xffff then
220 conn:write("\x81" .. string.char(126) .. string.char(len/0x100) .. string.char(len%0x100) .. s);
221 else
222 conn:write("\x81" .. string.char(127) .. string.char(len/0x100000000000000)
223 .. string.char((len%0x100000000000000)/0x1000000000000) .. string.char((len%0x1000000000000)/0x10000000000)
224 .. string.char((len%0x10000000000)/0x100000000) .. string.char((len%0x100000000)/0x1000000)
225 .. string.char((len%0x1000000)/0x10000) .. string.char((len%0x10000)/0x100)
226 .. string.char((len%0x100)))
227 end
228 end
229
230 if c2s_timeout then
231 add_task(c2s_timeout, function ()
232 if session.type == "c2s_unauthed" then
233 session:close("connection-timeout");
234 end
235 end);
236 end
237
238 session.dispatch_stanza = stream_callbacks.handlestanza;
239 end
240
241 function listener.onincoming(conn, data)
88 local session = sessions[conn]; 242 local session = sessions[conn];
89 if not session then 243 if session then
90 session = { type = "c2s_unauthed", 244 session.data(data);
91 conn = conn, 245 else
92 reset_stream = session_reset_stream, 246 listener.onconnect(conn, data);
93 close = session_close, 247 session = sessions[conn];
94 dispatch_stanza = stream_callbacks.handlestanza, 248 session.data(data);
95 log = logger.init("websocket"), 249 end
96 secure = conn.ssl }; 250 end
97 251
98 function session.send(s) 252 function listener.ondisconnect(conn, err)
99 conn:write("\00" .. tostring(s) .. "\255");
100 end
101
102 sessions[conn] = session;
103 end
104
105 session_reset_stream(session);
106
107 if data then
108 session.data(conn, data);
109 end
110 end
111
112 function websocket_listener.ondisconnect(conn, err)
113 local session = sessions[conn]; 253 local session = sessions[conn];
114 if session then 254 if session then
115 (session.log or log)("info", "Client disconnected: %s", err); 255 (session.log or log)("info", "Client disconnected: %s", err);
116 sm.destroy_session(session, err); 256 sm_destroy_session(session, err);
117 sessions[conn] = nil; 257 sessions[conn] = nil;
118 session = nil; 258 session = nil;
119 end 259 end
120 end 260 end
121 261
122 262 function listener.associate_session(conn, session)
123 function handle_request(method, body, request) 263 sessions[conn] = session;
124 if request.method ~= "GET" or request.headers["upgrade"] ~= "WebSocket" or request.headers["connection"] ~= "Upgrade" then 264 end
125 if request.method == "OPTIONS" then 265
126 return { headers = default_headers, body = "" }; 266 function handle_request(event, path)
127 else 267 local request, response = event.request, event.response;
128 return "<html><body>You really don't look like a Websocket client to me... what do you want?</body></html>"; 268
129 end 269 -- Add sanity checks
130 end 270
131 271 response.conn:setlistener(listener);
132 local subprotocol = request.headers["Websocket-Protocol"]; 272 response.status = "101 Switching Protocols";
133 if subprotocol ~= nil and subprotocol ~= "XMPP" then 273 response.headers.Upgrade = "websocket";
134 return "<html><body>You really don't look like an XMPP Websocket client to me... what do you want?</body></html>"; 274 response.headers.Connection = "Upgrade";
135 end 275 response.headers.Sec_WebSocket_Accept = base64(sha1(request.headers.sec_websocket_key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"));
136 276 response.headers.Sec_WebSocket_Protocol = "xmpp";
137 if not method then 277
138 log("debug", "Request %s suffered error %s", tostring(request.id), body); 278 return "";
139 return; 279 end
140 end 280
141 281 function module.load()
142 request.conn:setlistener(websocket_listener); 282 module:provides("http", {
143 request.write("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"); 283 name = "xmpp-websocket";
144 request.write("Upgrade: WebSocket\r\n"); 284 route = {
145 request.write("Connection: Upgrade\r\n"); 285 ["GET /*"] = handle_request;
146 request.write("WebSocket-Origin: file://\r\n"); -- FIXME 286 };
147 request.write("WebSocket-Location: ws://localhost:5281/xmpp-websocket\r\n"); -- FIXME 287 });
148 request.write("WebSocket-Protocol: XMPP\r\n"); 288 end
149 request.write("\r\n");
150
151 return true;
152 end
153
154 local function setup()
155 local ports = module:get_option("websocket_ports") or { 5281 };
156 httpserver.new_from_config(ports, handle_request, { base = "xmpp-websocket" });
157 end
158 if prosody.start_time then -- already started
159 setup();
160 else
161 prosody.events.add_handler("server-started", setup);
162 end