Comparison

plugins/mod_bosh.lua @ 8747:f91d45a692f0

mod_bosh: Improve connection robustness with better handling of unexpected rids
author Matthew Wild <mwild1@gmail.com>
date Tue, 10 Apr 2018 20:34:29 +0100
parent 8746:df1ca586c68d
child 8752:8f2da579a790
comparison
equal deleted inserted replaced
8746:df1ca586c68d 8747:f91d45a692f0
18 local math_min = math.min; 18 local math_min = math.min;
19 local tostring, type = tostring, type; 19 local tostring, type = tostring, type;
20 local traceback = debug.traceback; 20 local traceback = debug.traceback;
21 local runner = require"util.async".runner; 21 local runner = require"util.async".runner;
22 local nameprep = require "util.encodings".stringprep.nameprep; 22 local nameprep = require "util.encodings".stringprep.nameprep;
23 local cache = require "util.cache";
23 24
24 local xmlns_streams = "http://etherx.jabber.org/streams"; 25 local xmlns_streams = "http://etherx.jabber.org/streams";
25 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; 26 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
26 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; -- (hard-coded into a literal in session.send) 27 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; -- (hard-coded into a literal in session.send)
27 28
248 local runner_callbacks = { }; 249 local runner_callbacks = { };
249 250
250 -- Handle the <body> tag in the request payload. 251 -- Handle the <body> tag in the request payload.
251 function stream_callbacks.streamopened(context, attr) 252 function stream_callbacks.streamopened(context, attr)
252 local request, response = context.request, context.response; 253 local request, response = context.request, context.response;
253 local sid = attr.sid; 254 local sid, rid = attr.sid, tonumber(attr.rid);
254 log("debug", "BOSH body open (sid: %s)", sid or "<none>"); 255 log("debug", "BOSH body open (sid: %s)", sid or "<none>");
256 context.rid = rid;
255 if not sid then 257 if not sid then
256 -- New session request 258 -- New session request
257 context.notopen = nil; -- Signals that we accept this opening tag 259 context.notopen = nil; -- Signals that we accept this opening tag
258 260
259 local to_host = nameprep(attr.to); 261 local to_host = nameprep(attr.to);
260 local rid = tonumber(attr.rid);
261 local wait = tonumber(attr.wait); 262 local wait = tonumber(attr.wait);
262 if not to_host then 263 if not to_host then
263 log("debug", "BOSH client tried to connect to invalid host: %s", tostring(attr.to)); 264 log("debug", "BOSH client tried to connect to invalid host: %s", tostring(attr.to));
264 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", 265 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
265 ["xmlns:stream"] = xmlns_streams, condition = "improper-addressing" }); 266 ["xmlns:stream"] = xmlns_streams, condition = "improper-addressing" });
276 ["xmlns:stream"] = xmlns_streams, condition = "bad-request" }); 277 ["xmlns:stream"] = xmlns_streams, condition = "bad-request" });
277 response:send(tostring(close_reply)); 278 response:send(tostring(close_reply));
278 return; 279 return;
279 end 280 end
280 281
281 rid = rid - 1;
282 wait = math_min(wait, bosh_max_wait); 282 wait = math_min(wait, bosh_max_wait);
283 283
284 -- New session 284 -- New session
285 sid = new_uuid(); 285 sid = new_uuid();
286 local session = { 286 local session = {
287 type = "c2s_unauthed", conn = request.conn, sid = sid, rid = rid, host = attr.to, 287 type = "c2s_unauthed", conn = request.conn, sid = sid, host = attr.to,
288 rid = rid - 1, -- Hack for initial session setup, "previous" rid was $current_request - 1
288 bosh_version = attr.ver, bosh_wait = wait, streamid = sid, 289 bosh_version = attr.ver, bosh_wait = wait, streamid = sid,
289 bosh_max_inactive = bosh_max_inactivity, 290 bosh_max_inactive = bosh_max_inactivity, bosh_responses = cache.new(BOSH_HOLD):table();
290 requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream, 291 requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream,
291 close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true, 292 close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true,
292 log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure, 293 log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure,
293 ip = request.ip; 294 ip = request.ip;
294 }; 295 };
340 body_attr.ver = '1.6'; 341 body_attr.ver = '1.6';
341 body_attr.from = session.host; 342 body_attr.from = session.host;
342 body_attr["xmlns:xmpp"] = "urn:xmpp:xbosh"; 343 body_attr["xmlns:xmpp"] = "urn:xmpp:xbosh";
343 body_attr["xmpp:version"] = "1.0"; 344 body_attr["xmpp:version"] = "1.0";
344 end 345 end
345 session.bosh_last_response = st.stanza("body", body_attr):top_tag()..t_concat(session.send_buffer).."</body>"; 346 local response_xml = st.stanza("body", body_attr):top_tag()..t_concat(session.send_buffer).."</body>";
346 oldest_request:send(session.bosh_last_response); 347 session.bosh_responses[oldest_request.context.rid] = response_xml;
348 oldest_request:send(response_xml);
347 session.send_buffer = {}; 349 session.send_buffer = {};
348 end 350 end
349 return true; 351 return true;
350 end 352 end
351 request.sid = sid; 353 request.sid = sid;
361 end 363 end
362 364
363 session.conn = request.conn; 365 session.conn = request.conn;
364 366
365 if session.rid then 367 if session.rid then
366 local rid = tonumber(attr.rid);
367 local diff = rid - session.rid; 368 local diff = rid - session.rid;
368 -- Diff should be 1 for a healthy request 369 -- Diff should be 1 for a healthy request
370 session.log("debug", "rid: %d, sess: %s, diff: %d", rid, session.rid, diff)
369 if diff ~= 1 then 371 if diff ~= 1 then
370 context.sid = sid; 372 context.sid = sid;
371 context.notopen = nil; 373 context.notopen = nil;
372 if diff == 2 then -- Missed a request 374 if diff == 2 then -- Missed a request
373 -- Hold request, but don't process it (ouch!) 375 -- Hold request, but don't process it (ouch!)
377 return; 379 return;
378 end 380 end
379 -- Set a marker to indicate that stanzas in this request should NOT be processed 381 -- Set a marker to indicate that stanzas in this request should NOT be processed
380 -- (these stanzas will already be in the XML parser's buffer) 382 -- (these stanzas will already be in the XML parser's buffer)
381 context.ignore = true; 383 context.ignore = true;
382 if diff == 0 then 384 if session.bosh_responses[rid] then
383 -- Re-send previous response, ignore stanzas in this request 385 -- Re-send past response, ignore stanzas in this request
384 session.log("debug", "rid repeated, ignoring: %s (diff %d)", session.rid, diff); 386 session.log("debug", "rid repeated within window, replaying old response");
385 response:send(session.bosh_last_response); 387 response:send(session.bosh_responses[rid]);
388 return;
389 elseif diff == 0 then
390 session.log("debug", "current rid repeated, ignoring stanzas");
391 t_insert(session.requests, response);
392 context.sid = sid;
386 return; 393 return;
387 end 394 end
388 -- Session broken, destroy it 395 -- Session broken, destroy it
389 session.log("debug", "rid out of range: %d (diff %d)", rid, diff); 396 session.log("debug", "rid out of range: %d (diff %d)", rid, diff);
390 response:send(tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" }))); 397 response:send(tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" })));