Software /
code /
prosody-modules
Comparison
mod_websocket/mod_websocket.lua @ 676:54fa9d6d7809
mod_websocket: New mod_c2s based version, still WIP
author | Florian Zeitz <florob@babelmonkeys.de> |
---|---|
date | Fri, 25 May 2012 17:20:41 +0200 |
parent | 129:5fc00a3e47b5 |
child | 677:eeb41cd5e9f3 |
comparison
equal
deleted
inserted
replaced
675:da33325453fb | 676:54fa9d6d7809 |
---|---|
1 module.host = "*" -- Global module | 1 -- Prosody IM |
2 | 2 -- Copyright (C) 2008-2010 Matthew Wild |
3 local logger = require "util.logger"; | 3 -- Copyright (C) 2008-2010 Waqas Hussain |
4 local log = logger.init("mod_websocket"); | 4 -- Copyright (C) 2012 Florian Zeitz |
5 local httpserver = require "net.httpserver"; | 5 -- |
6 local lxp = require "lxp"; | 6 -- This project is MIT/X11 licensed. Please see the |
7 local init_xmlhandlers = require "core.xmlhandlers"; | 7 -- COPYING file in the source package for more information. |
8 -- | |
9 | |
10 module:set_global(); | |
11 | |
12 local add_task = require "util.timer".add_task; | |
13 local new_xmpp_stream = require "util.xmppstream".new; | |
14 local nameprep = require "util.encodings".stringprep.nameprep; | |
15 local sessionmanager = require "core.sessionmanager"; | |
8 local st = require "util.stanza"; | 16 local st = require "util.stanza"; |
9 local sm = require "core.sessionmanager"; | 17 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; |
10 | 18 local uuid_generate = require "util.uuid".generate; |
11 local sessions = {}; | 19 local sha1 = require "util.hashes".sha1; |
12 local default_headers = { }; | 20 local base64 = require "util.encodings".base64.encode; |
13 | 21 local bxor = require "bit".bxor; |
14 | 22 local tohex = require "bit".tohex; |
15 local stream_callbacks = { default_ns = "jabber:client", | 23 |
16 streamopened = sm.streamopened, | 24 module:depends("http") |
17 streamclosed = sm.streamclosed, | 25 |
18 handlestanza = core_process_stanza }; | 26 |
27 local xpcall, tostring, type = xpcall, tostring, type; | |
28 local traceback = debug.traceback; | |
29 | |
30 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; | |
31 | |
32 local log = module._log; | |
33 | |
34 local c2s_timeout = module:get_option_number("c2s_timeout"); | |
35 local opt_keepalives = module:get_option_boolean("tcp_keepalives", false); | |
36 | |
37 local sessions = module:shared("sessions"); | |
38 | |
39 local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza }; | |
40 local listener = {}; | |
41 | |
42 --- Stream events handlers | |
43 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; | |
44 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; | |
45 | |
46 function stream_callbacks.streamopened(session, attr) | |
47 local send = session.send; | |
48 session.host = nameprep(attr.to); | |
49 if not session.host then | |
50 session:close{ condition = "improper-addressing", | |
51 text = "A valid 'to' attribute is required on stream headers" }; | |
52 return; | |
53 end | |
54 session.version = tonumber(attr.version) or 0; | |
55 session.streamid = uuid_generate(); | |
56 (session.log or session)("debug", "Client sent opening <stream:stream> to %s", session.host); | |
57 | |
58 if not hosts[session.host] then | |
59 -- We don't serve this host... | |
60 session:close{ condition = "host-unknown", text = "This server does not serve "..tostring(session.host)}; | |
61 return; | |
62 end | |
63 | |
64 send("<?xml version='1.0'?>"..(tostring(st.stanza("stream:stream", { | |
65 xmlns = 'jabber:client', ["xmlns:stream"] = 'http://etherx.jabber.org/streams'; | |
66 id = session.streamid, from = session.host, version = '1.0', ["xml:lang"] = 'en' }):top_tag()):gsub(">", "/>"))); | |
67 | |
68 (session.log or log)("debug", "Sent reply <stream:stream> to client"); | |
69 session.notopen = nil; | |
70 | |
71 -- If session.secure is *false* (not nil) then it means we /were/ encrypting | |
72 -- since we now have a new stream header, session is secured | |
73 if session.secure == false then | |
74 session.secure = true; | |
75 end | |
76 | |
77 local features = st.stanza("stream:features"); | |
78 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); | |
79 module:fire_event("stream-features", session, features); | |
80 | |
81 send(features); | |
82 end | |
83 | |
84 function stream_callbacks.streamclosed(session) | |
85 session.log("debug", "Received </stream:stream>"); | |
86 session:close(); | |
87 end | |
88 | |
19 function stream_callbacks.error(session, error, data) | 89 function stream_callbacks.error(session, error, data) |
20 if error == "no-stream" then | 90 if error == "no-stream" then |
21 session.log("debug", "Invalid opening stream header"); | 91 session.log("debug", "Invalid opening stream header"); |
22 session:close("invalid-namespace"); | 92 session:close("invalid-namespace"); |
23 elseif session.close then | 93 elseif error == "parse-error" then |
24 (session.log or log)("debug", "Client XML parse error: %s", tostring(error)); | 94 (session.log or log)("debug", "Client XML parse error: %s", tostring(data)); |
25 session:close("xml-not-well-formed"); | 95 session:close("not-well-formed"); |
26 end | 96 elseif error == "stream-error" then |
27 end | 97 local condition, text = "undefined-condition"; |
28 | 98 for child in data:children() do |
29 | 99 if child.attr.xmlns == xmlns_xmpp_streams then |
30 local function session_reset_stream(session) | 100 if child.name ~= "text" then |
31 local parser = lxp.new(init_xmlhandlers(session, stream_callbacks), "\1"); | 101 condition = child.name; |
32 session.parser = parser; | 102 else |
33 | 103 text = child:get_text(); |
34 session.notopen = true; | 104 end |
35 | 105 if condition ~= "undefined-condition" and text then |
36 function session.data(conn, data) | 106 break; |
37 data, _ = data:gsub("[%z\255]", "") | 107 end |
38 log("debug", "Parsing: %s", data) | 108 end |
39 | 109 end |
40 local ok, err = parser:parse(data) | 110 text = condition .. (text and (" ("..text..")") or ""); |
41 if not ok then | 111 session.log("info", "Session closed by remote with error: %s", text); |
42 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, | 112 session:close(nil, text); |
43 data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); | 113 end |
44 session:close("xml-not-well-formed"); | 114 end |
45 end | 115 |
46 end | 116 local function handleerr(err) log("error", "Traceback[c2s]: %s: %s", tostring(err), traceback()); end |
47 end | 117 function stream_callbacks.handlestanza(session, stanza) |
48 | 118 stanza = session.filter("stanzas/in", stanza); |
49 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; | 119 if stanza then |
50 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; | 120 return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); |
121 end | |
122 end | |
123 | |
124 --- Session methods | |
51 local function session_close(session, reason) | 125 local function session_close(session, reason) |
52 local log = session.log or log; | 126 local log = session.log or log; |
53 if session.conn then | 127 if session.conn then |
54 if session.notopen then | 128 if session.notopen then |
55 session.send("<?xml version='1.0'?>"); | 129 session.send("<?xml version='1.0'?>"); |
76 end | 150 end |
77 end | 151 end |
78 end | 152 end |
79 session.send("</stream:stream>"); | 153 session.send("</stream:stream>"); |
80 session.conn:close(); | 154 session.conn:close(); |
81 websocket_listener.ondisconnect(session.conn, (reason and (reason.text or reason.condition)) or reason or "session closed"); | 155 listener.ondisconnect(session.conn, (reason and (reason.text or reason.condition)) or reason or "session closed"); |
82 end | 156 end |
83 end | 157 end |
84 | 158 |
85 | 159 --- Port listener |
86 local websocket_listener = { default_mode = "*a" }; | 160 function listener.onconnect(conn) |
87 function websocket_listener.onincoming(conn, data) | 161 local session = sm_new_session(conn); |
162 sessions[conn] = session; | |
163 | |
164 session.log("info", "Client connected"); | |
165 | |
166 -- Client is using legacy SSL (otherwise mod_tls sets this flag) | |
167 if conn:ssl() then | |
168 session.secure = true; | |
169 end | |
170 | |
171 if opt_keepalives then | |
172 conn:setoption("keepalive", opt_keepalives); | |
173 end | |
174 | |
175 session.close = session_close; | |
176 | |
177 local stream = new_xmpp_stream(session, stream_callbacks); | |
178 session.stream = stream; | |
179 session.notopen = true; | |
180 | |
181 function session.reset_stream() | |
182 session.notopen = true; | |
183 session.stream:reset(); | |
184 end | |
185 | |
186 local filter = session.filter; | |
187 function session.data(data) | |
188 local off = 0; | |
189 local len = string.byte(data, 2) - 0x80; | |
190 if len == 126 then | |
191 off = 2; | |
192 elseif len ==127 then | |
193 off = 8; | |
194 end | |
195 local key = {string.byte(data, off+3), string.byte(data, off+4), string.byte(data, off+5), string.byte(data, off+6)} | |
196 local decoded = ""; | |
197 local counter = 0; | |
198 for i = off+7, #data do | |
199 decoded = decoded .. string.char(bxor(key[counter+1], string.byte(data, i))); | |
200 counter = (counter + 1) % 4; | |
201 end | |
202 module:log("debug", "Websocket received: %s %i", decoded, #decoded) | |
203 decoded = decoded:gsub("/>$", ">"); | |
204 | |
205 data = filter("bytes/in", decoded); | |
206 if data then | |
207 local ok, err = stream:feed(data); | |
208 if ok then return; end | |
209 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); | |
210 session:close("not-well-formed"); | |
211 end | |
212 end | |
213 | |
214 function session.send(s) | |
215 s = tostring(s); | |
216 local len = #s; | |
217 if len < 126 then | |
218 conn:write("\x81" .. string.char(len) .. s); | |
219 elseif len <= 0xffff then | |
220 conn:write("\x81" .. string.char(126) .. string.char(len/0x100) .. string.char(len%0x100) .. s); | |
221 else | |
222 conn:write("\x81" .. string.char(127) .. string.char(len/0x100000000000000) | |
223 .. string.char((len%0x100000000000000)/0x1000000000000) .. string.char((len%0x1000000000000)/0x10000000000) | |
224 .. string.char((len%0x10000000000)/0x100000000) .. string.char((len%0x100000000)/0x1000000) | |
225 .. string.char((len%0x1000000)/0x10000) .. string.char((len%0x10000)/0x100) | |
226 .. string.char((len%0x100))) | |
227 end | |
228 end | |
229 | |
230 if c2s_timeout then | |
231 add_task(c2s_timeout, function () | |
232 if session.type == "c2s_unauthed" then | |
233 session:close("connection-timeout"); | |
234 end | |
235 end); | |
236 end | |
237 | |
238 session.dispatch_stanza = stream_callbacks.handlestanza; | |
239 end | |
240 | |
241 function listener.onincoming(conn, data) | |
88 local session = sessions[conn]; | 242 local session = sessions[conn]; |
89 if not session then | 243 if session then |
90 session = { type = "c2s_unauthed", | 244 session.data(data); |
91 conn = conn, | 245 else |
92 reset_stream = session_reset_stream, | 246 listener.onconnect(conn, data); |
93 close = session_close, | 247 session = sessions[conn]; |
94 dispatch_stanza = stream_callbacks.handlestanza, | 248 session.data(data); |
95 log = logger.init("websocket"), | 249 end |
96 secure = conn.ssl }; | 250 end |
97 | 251 |
98 function session.send(s) | 252 function listener.ondisconnect(conn, err) |
99 conn:write("\00" .. tostring(s) .. "\255"); | |
100 end | |
101 | |
102 sessions[conn] = session; | |
103 end | |
104 | |
105 session_reset_stream(session); | |
106 | |
107 if data then | |
108 session.data(conn, data); | |
109 end | |
110 end | |
111 | |
112 function websocket_listener.ondisconnect(conn, err) | |
113 local session = sessions[conn]; | 253 local session = sessions[conn]; |
114 if session then | 254 if session then |
115 (session.log or log)("info", "Client disconnected: %s", err); | 255 (session.log or log)("info", "Client disconnected: %s", err); |
116 sm.destroy_session(session, err); | 256 sm_destroy_session(session, err); |
117 sessions[conn] = nil; | 257 sessions[conn] = nil; |
118 session = nil; | 258 session = nil; |
119 end | 259 end |
120 end | 260 end |
121 | 261 |
122 | 262 function listener.associate_session(conn, session) |
123 function handle_request(method, body, request) | 263 sessions[conn] = session; |
124 if request.method ~= "GET" or request.headers["upgrade"] ~= "WebSocket" or request.headers["connection"] ~= "Upgrade" then | 264 end |
125 if request.method == "OPTIONS" then | 265 |
126 return { headers = default_headers, body = "" }; | 266 function handle_request(event, path) |
127 else | 267 local request, response = event.request, event.response; |
128 return "<html><body>You really don't look like a Websocket client to me... what do you want?</body></html>"; | 268 |
129 end | 269 -- Add sanity checks |
130 end | 270 |
131 | 271 response.conn:setlistener(listener); |
132 local subprotocol = request.headers["Websocket-Protocol"]; | 272 response.status = "101 Switching Protocols"; |
133 if subprotocol ~= nil and subprotocol ~= "XMPP" then | 273 response.headers.Upgrade = "websocket"; |
134 return "<html><body>You really don't look like an XMPP Websocket client to me... what do you want?</body></html>"; | 274 response.headers.Connection = "Upgrade"; |
135 end | 275 response.headers.Sec_WebSocket_Accept = base64(sha1(request.headers.sec_websocket_key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); |
136 | 276 response.headers.Sec_WebSocket_Protocol = "xmpp"; |
137 if not method then | 277 |
138 log("debug", "Request %s suffered error %s", tostring(request.id), body); | 278 return ""; |
139 return; | 279 end |
140 end | 280 |
141 | 281 function module.load() |
142 request.conn:setlistener(websocket_listener); | 282 module:provides("http", { |
143 request.write("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"); | 283 name = "xmpp-websocket"; |
144 request.write("Upgrade: WebSocket\r\n"); | 284 route = { |
145 request.write("Connection: Upgrade\r\n"); | 285 ["GET /*"] = handle_request; |
146 request.write("WebSocket-Origin: file://\r\n"); -- FIXME | 286 }; |
147 request.write("WebSocket-Location: ws://localhost:5281/xmpp-websocket\r\n"); -- FIXME | 287 }); |
148 request.write("WebSocket-Protocol: XMPP\r\n"); | 288 end |
149 request.write("\r\n"); | |
150 | |
151 return true; | |
152 end | |
153 | |
154 local function setup() | |
155 local ports = module:get_option("websocket_ports") or { 5281 }; | |
156 httpserver.new_from_config(ports, handle_request, { base = "xmpp-websocket" }); | |
157 end | |
158 if prosody.start_time then -- already started | |
159 setup(); | |
160 else | |
161 prosody.events.add_handler("server-started", setup); | |
162 end |