Changeset

1340:3ffd64b4ab59

mod_websocket: Update to draft-ietf-xmpp-websocket-01
author Florian Zeitz <florob@babelmonkeys.de>
date Sun, 09 Mar 2014 23:35:57 +0100
parents 1337:c38f163f18b9
children 1341:f5c256a5f209
files mod_websocket/mod_websocket.lua
diffstat 1 files changed, 107 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- a/mod_websocket/mod_websocket.lua	Sun Mar 09 14:09:24 2014 +0100
+++ b/mod_websocket/mod_websocket.lua	Sun Mar 09 23:35:57 2014 +0100
@@ -11,7 +11,11 @@
 local sha1 = require "util.hashes".sha1;
 local base64 = require "util.encodings".base64.encode;
 local softreq = require "util.dependencies".softreq;
+local st = require "util.stanza";
+local parse_xml = require "util.xml".parse;
 local portmanager = require "core.portmanager";
+local sm_destroy_session = sessionmanager.destroy_session;
+local log = module._log;
 
 local bit;
 pcall(function() bit = require"bit"; end);
@@ -38,6 +42,10 @@
 	end
 end
 
+local xmlns_framing = "urn:ietf:params:xml:ns:xmpp-framing";
+local xmlns_streams = "http://etherx.jabber.org/streams";
+local xmlns_client = "jabber:client";
+
 module:depends("c2s")
 local sessions = module:shared("c2s/sessions");
 local c2s_listener = portmanager.get_service("c2s").listener;
@@ -128,7 +136,93 @@
 	return t_concat(result, "");
 end
 
+--- Session methods
+local function session_open_stream(session)
+	local attr = {
+		xmlns = xmlns_framing,
+		version = "1.0",
+		id = session.streamid or "",
+		from = session.host
+	};
+	session.send(st.stanza("open", attr));
+end
+
+local function session_close(session, reason)
+	local log = session.log or log;
+	if session.conn then
+		if session.notopen then
+			session:open_stream();
+		end
+		if reason then -- nil == no err, initiated by us, false == initiated by client
+			local stream_error = st.stanza("stream:error");
+			if type(reason) == "string" then -- assume stream error
+				stream_error:tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' });
+			elseif type(reason) == "table" then
+				if reason.condition then
+					stream_error:tag(reason.condition, stream_xmlns_attr):up();
+					if reason.text then
+						stream_error:tag("text", stream_xmlns_attr):text(reason.text):up();
+					end
+					if reason.extra then
+						stream_error:add_child(reason.extra);
+					end
+				elseif reason.name then -- a stanza
+					stream_error = reason;
+				end
+			end
+			stream_error = tostring(stream_error);
+			log("debug", "Disconnecting client, <stream:error> is: %s", stream_error);
+			session.send(stream_error);
+		end
+
+		session.send(st.stanza("close", { xmlns = xmlns_framing }));
+		function session.send() return false; end
+
+		local reason = (reason and (reason.name or reason.text or reason.condition)) or reason;
+		session.log("debug", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed");
+
+		-- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
+		local conn = session.conn;
+		if reason == nil and not session.notopen and session.type == "c2s" then
+			-- Grace time to process data from authenticated cleanly-closed stream
+			add_task(stream_close_timeout, function ()
+				if not session.destroyed then
+					session.log("warn", "Failed to receive a stream close response, closing connection anyway...");
+					sm_destroy_session(session, reason);
+					-- Sends close with code 1000 and message "Stream closed"
+					local data = s_char(0x03) .. s_char(0xe8) .. "Stream closed";
+					conn:write(build_frame({opcode = 0x8, FIN = true, data = data}));
+					conn:close();
+				end
+			end);
+		else
+			sm_destroy_session(session, reason);
+			-- Sends close with code 1000 and message "Stream closed"
+			local data = s_char(0x03) .. s_char(0xe8) .. "Stream closed";
+			conn:write(build_frame({opcode = 0x8, FIN = true, data = data}));
+			conn:close();
+		end
+	end
+end
+
+
 --- Filter stuff
+local function filter_open_close(data)
+	if not data:find(xmlns_framing, 1, true) then return data; end
+
+	local oc = parse_xml(data);
+	if not oc then return data; end
+	if oc.attr.xmlns ~= xmlns_framing then return data; end
+	if oc.name == "close" then return "</stream:stream>"; end
+	if oc.name == "open" then
+		oc.name = "stream:stream";
+		oc.attr.xmlns = nil;
+		oc.attr["xmlns:stream"] = xmlns_streams;
+		return oc:top_tag();
+	end
+
+	return data;
+end
 function handle_request(event, path)
 	local request, response = event.request, event.response;
 	local conn = response.conn;
@@ -245,6 +339,9 @@
 
 	session.secure = consider_websocket_secure or session.secure;
 
+	session.open_stream = session_open_stream;
+	session.close = session_close;
+
 	local frameBuffer = "";
 	add_filter(session, "bytes/in", function(data)
 		local cache = {};
@@ -255,7 +352,7 @@
 			frameBuffer = frameBuffer:sub(length + 1);
 			local result = handle_frame(frame);
 			if not result then return; end
-			cache[#cache+1] = result;
+			cache[#cache+1] = filter_open_close(result);
 			frame, length = parse_frame(frameBuffer);
 		end
 		return t_concat(cache, "");
@@ -265,6 +362,15 @@
 		return build_frame({ FIN = true, opcode = 0x01, data = tostring(data)});
 	end);
 
+	add_filter(session, "stanzas/out", function(stanza)
+		local attr = stanza.attr;
+		attr.xmlns = attr.xmlns or xmlns_client;
+		if stanza.name:find("^stream:") then
+			attr["xmlns:stream"] = attr["xmlns:stream"] or xmlns_streams;
+		end
+		return stanza;
+	end);
+
 	response.status_code = 101;
 	response.headers.upgrade = "websocket";
 	response.headers.connection = "Upgrade";