Diff

plugins/mod_bosh.lua @ 8747:f91d45a692f0

mod_bosh: Improve connection robustness with better handling of unexpected rids
author Matthew Wild <mwild1@gmail.com>
date Tue, 10 Apr 2018 20:34:29 +0100
parent 8746:df1ca586c68d
child 8752:8f2da579a790
line wrap: on
line diff
--- a/plugins/mod_bosh.lua	Tue Apr 10 20:30:20 2018 +0100
+++ b/plugins/mod_bosh.lua	Tue Apr 10 20:34:29 2018 +0100
@@ -20,6 +20,7 @@
 local traceback = debug.traceback;
 local runner = require"util.async".runner;
 local nameprep = require "util.encodings".stringprep.nameprep;
+local cache = require "util.cache";
 
 local xmlns_streams = "http://etherx.jabber.org/streams";
 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
@@ -250,14 +251,14 @@
 -- Handle the <body> tag in the request payload.
 function stream_callbacks.streamopened(context, attr)
 	local request, response = context.request, context.response;
-	local sid = attr.sid;
+	local sid, rid = attr.sid, tonumber(attr.rid);
 	log("debug", "BOSH body open (sid: %s)", sid or "<none>");
+	context.rid = rid;
 	if not sid then
 		-- New session request
 		context.notopen = nil; -- Signals that we accept this opening tag
 
 		local to_host = nameprep(attr.to);
-		local rid = tonumber(attr.rid);
 		local wait = tonumber(attr.wait);
 		if not to_host then
 			log("debug", "BOSH client tried to connect to invalid host: %s", tostring(attr.to));
@@ -278,15 +279,15 @@
 			return;
 		end
 
-		rid = rid - 1;
 		wait = math_min(wait, bosh_max_wait);
 
 		-- New session
 		sid = new_uuid();
 		local session = {
-			type = "c2s_unauthed", conn = request.conn, sid = sid, rid = rid, host = attr.to,
+			type = "c2s_unauthed", conn = request.conn, sid = sid, host = attr.to,
+			rid = rid - 1, -- Hack for initial session setup, "previous" rid was $current_request - 1
 			bosh_version = attr.ver, bosh_wait = wait, streamid = sid,
-			bosh_max_inactive = bosh_max_inactivity,
+			bosh_max_inactive = bosh_max_inactivity, bosh_responses = cache.new(BOSH_HOLD):table();
 			requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream,
 			close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true,
 			log = logger.init("bosh"..sid),	secure = consider_bosh_secure or request.secure,
@@ -342,8 +343,9 @@
 					body_attr["xmlns:xmpp"] = "urn:xmpp:xbosh";
 					body_attr["xmpp:version"] = "1.0";
 				end
-				session.bosh_last_response = st.stanza("body", body_attr):top_tag()..t_concat(session.send_buffer).."</body>";
-				oldest_request:send(session.bosh_last_response);
+				local response_xml = st.stanza("body", body_attr):top_tag()..t_concat(session.send_buffer).."</body>";
+				session.bosh_responses[oldest_request.context.rid] = response_xml;
+				oldest_request:send(response_xml);
 				session.send_buffer = {};
 			end
 			return true;
@@ -363,9 +365,9 @@
 	session.conn = request.conn;
 
 	if session.rid then
-		local rid = tonumber(attr.rid);
 		local diff = rid - session.rid;
 		-- Diff should be 1 for a healthy request
+		session.log("debug", "rid: %d, sess: %s, diff: %d", rid, session.rid, diff)
 		if diff ~= 1 then
 			context.sid = sid;
 			context.notopen = nil;
@@ -379,10 +381,15 @@
 			-- Set a marker to indicate that stanzas in this request should NOT be processed
 			-- (these stanzas will already be in the XML parser's buffer)
 			context.ignore = true;
-			if diff == 0 then
-				-- Re-send previous response, ignore stanzas in this request
-				session.log("debug", "rid repeated, ignoring: %s (diff %d)", session.rid, diff);
-				response:send(session.bosh_last_response);
+			if session.bosh_responses[rid] then
+				-- Re-send past response, ignore stanzas in this request
+				session.log("debug", "rid repeated within window, replaying old response");
+				response:send(session.bosh_responses[rid]);
+				return;
+			elseif diff == 0 then
+				session.log("debug", "current rid repeated, ignoring stanzas");
+				t_insert(session.requests, response);
+				context.sid = sid;
 				return;
 			end
 			-- Session broken, destroy it