Software /
code /
prosody
Comparison
plugins/mod_bosh.lua @ 8747:f91d45a692f0
mod_bosh: Improve connection robustness with better handling of unexpected rids
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Tue, 10 Apr 2018 20:34:29 +0100 |
parent | 8746:df1ca586c68d |
child | 8752:8f2da579a790 |
comparison
equal
deleted
inserted
replaced
8746:df1ca586c68d | 8747:f91d45a692f0 |
---|---|
18 local math_min = math.min; | 18 local math_min = math.min; |
19 local tostring, type = tostring, type; | 19 local tostring, type = tostring, type; |
20 local traceback = debug.traceback; | 20 local traceback = debug.traceback; |
21 local runner = require"util.async".runner; | 21 local runner = require"util.async".runner; |
22 local nameprep = require "util.encodings".stringprep.nameprep; | 22 local nameprep = require "util.encodings".stringprep.nameprep; |
23 local cache = require "util.cache"; | |
23 | 24 |
24 local xmlns_streams = "http://etherx.jabber.org/streams"; | 25 local xmlns_streams = "http://etherx.jabber.org/streams"; |
25 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; | 26 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; |
26 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; -- (hard-coded into a literal in session.send) | 27 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; -- (hard-coded into a literal in session.send) |
27 | 28 |
248 local runner_callbacks = { }; | 249 local runner_callbacks = { }; |
249 | 250 |
250 -- Handle the <body> tag in the request payload. | 251 -- Handle the <body> tag in the request payload. |
251 function stream_callbacks.streamopened(context, attr) | 252 function stream_callbacks.streamopened(context, attr) |
252 local request, response = context.request, context.response; | 253 local request, response = context.request, context.response; |
253 local sid = attr.sid; | 254 local sid, rid = attr.sid, tonumber(attr.rid); |
254 log("debug", "BOSH body open (sid: %s)", sid or "<none>"); | 255 log("debug", "BOSH body open (sid: %s)", sid or "<none>"); |
256 context.rid = rid; | |
255 if not sid then | 257 if not sid then |
256 -- New session request | 258 -- New session request |
257 context.notopen = nil; -- Signals that we accept this opening tag | 259 context.notopen = nil; -- Signals that we accept this opening tag |
258 | 260 |
259 local to_host = nameprep(attr.to); | 261 local to_host = nameprep(attr.to); |
260 local rid = tonumber(attr.rid); | |
261 local wait = tonumber(attr.wait); | 262 local wait = tonumber(attr.wait); |
262 if not to_host then | 263 if not to_host then |
263 log("debug", "BOSH client tried to connect to invalid host: %s", tostring(attr.to)); | 264 log("debug", "BOSH client tried to connect to invalid host: %s", tostring(attr.to)); |
264 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", | 265 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", |
265 ["xmlns:stream"] = xmlns_streams, condition = "improper-addressing" }); | 266 ["xmlns:stream"] = xmlns_streams, condition = "improper-addressing" }); |
276 ["xmlns:stream"] = xmlns_streams, condition = "bad-request" }); | 277 ["xmlns:stream"] = xmlns_streams, condition = "bad-request" }); |
277 response:send(tostring(close_reply)); | 278 response:send(tostring(close_reply)); |
278 return; | 279 return; |
279 end | 280 end |
280 | 281 |
281 rid = rid - 1; | |
282 wait = math_min(wait, bosh_max_wait); | 282 wait = math_min(wait, bosh_max_wait); |
283 | 283 |
284 -- New session | 284 -- New session |
285 sid = new_uuid(); | 285 sid = new_uuid(); |
286 local session = { | 286 local session = { |
287 type = "c2s_unauthed", conn = request.conn, sid = sid, rid = rid, host = attr.to, | 287 type = "c2s_unauthed", conn = request.conn, sid = sid, host = attr.to, |
288 rid = rid - 1, -- Hack for initial session setup, "previous" rid was $current_request - 1 | |
288 bosh_version = attr.ver, bosh_wait = wait, streamid = sid, | 289 bosh_version = attr.ver, bosh_wait = wait, streamid = sid, |
289 bosh_max_inactive = bosh_max_inactivity, | 290 bosh_max_inactive = bosh_max_inactivity, bosh_responses = cache.new(BOSH_HOLD):table(); |
290 requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream, | 291 requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream, |
291 close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true, | 292 close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true, |
292 log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure, | 293 log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure, |
293 ip = request.ip; | 294 ip = request.ip; |
294 }; | 295 }; |
340 body_attr.ver = '1.6'; | 341 body_attr.ver = '1.6'; |
341 body_attr.from = session.host; | 342 body_attr.from = session.host; |
342 body_attr["xmlns:xmpp"] = "urn:xmpp:xbosh"; | 343 body_attr["xmlns:xmpp"] = "urn:xmpp:xbosh"; |
343 body_attr["xmpp:version"] = "1.0"; | 344 body_attr["xmpp:version"] = "1.0"; |
344 end | 345 end |
345 session.bosh_last_response = st.stanza("body", body_attr):top_tag()..t_concat(session.send_buffer).."</body>"; | 346 local response_xml = st.stanza("body", body_attr):top_tag()..t_concat(session.send_buffer).."</body>"; |
346 oldest_request:send(session.bosh_last_response); | 347 session.bosh_responses[oldest_request.context.rid] = response_xml; |
348 oldest_request:send(response_xml); | |
347 session.send_buffer = {}; | 349 session.send_buffer = {}; |
348 end | 350 end |
349 return true; | 351 return true; |
350 end | 352 end |
351 request.sid = sid; | 353 request.sid = sid; |
361 end | 363 end |
362 | 364 |
363 session.conn = request.conn; | 365 session.conn = request.conn; |
364 | 366 |
365 if session.rid then | 367 if session.rid then |
366 local rid = tonumber(attr.rid); | |
367 local diff = rid - session.rid; | 368 local diff = rid - session.rid; |
368 -- Diff should be 1 for a healthy request | 369 -- Diff should be 1 for a healthy request |
370 session.log("debug", "rid: %d, sess: %s, diff: %d", rid, session.rid, diff) | |
369 if diff ~= 1 then | 371 if diff ~= 1 then |
370 context.sid = sid; | 372 context.sid = sid; |
371 context.notopen = nil; | 373 context.notopen = nil; |
372 if diff == 2 then -- Missed a request | 374 if diff == 2 then -- Missed a request |
373 -- Hold request, but don't process it (ouch!) | 375 -- Hold request, but don't process it (ouch!) |
377 return; | 379 return; |
378 end | 380 end |
379 -- Set a marker to indicate that stanzas in this request should NOT be processed | 381 -- Set a marker to indicate that stanzas in this request should NOT be processed |
380 -- (these stanzas will already be in the XML parser's buffer) | 382 -- (these stanzas will already be in the XML parser's buffer) |
381 context.ignore = true; | 383 context.ignore = true; |
382 if diff == 0 then | 384 if session.bosh_responses[rid] then |
383 -- Re-send previous response, ignore stanzas in this request | 385 -- Re-send past response, ignore stanzas in this request |
384 session.log("debug", "rid repeated, ignoring: %s (diff %d)", session.rid, diff); | 386 session.log("debug", "rid repeated within window, replaying old response"); |
385 response:send(session.bosh_last_response); | 387 response:send(session.bosh_responses[rid]); |
388 return; | |
389 elseif diff == 0 then | |
390 session.log("debug", "current rid repeated, ignoring stanzas"); | |
391 t_insert(session.requests, response); | |
392 context.sid = sid; | |
386 return; | 393 return; |
387 end | 394 end |
388 -- Session broken, destroy it | 395 -- Session broken, destroy it |
389 session.log("debug", "rid out of range: %d (diff %d)", rid, diff); | 396 session.log("debug", "rid out of range: %d (diff %d)", rid, diff); |
390 response:send(tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" }))); | 397 response:send(tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" }))); |