Comparison

plugins/mod_bosh.lua @ 7654:b40776ee2aef

Merge 0.10->trunk
author Matthew Wild <mwild1@gmail.com>
date Fri, 02 Sep 2016 23:00:43 +0100
parent 7506:8cca24bea3e0
parent 7653:17e42f793341
child 7655:132819f409dc
comparison
equal deleted inserted replaced
7650:4c2407422f71 7654:b40776ee2aef
29 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; -- (hard-coded into a literal in session.send) 29 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; -- (hard-coded into a literal in session.send)
30 30
31 local stream_callbacks = { 31 local stream_callbacks = {
32 stream_ns = xmlns_bosh, stream_tag = "body", default_ns = "jabber:client" }; 32 stream_ns = xmlns_bosh, stream_tag = "body", default_ns = "jabber:client" };
33 33
34 local BOSH_DEFAULT_HOLD = module:get_option_number("bosh_default_hold", 1); 34 -- These constants are implicitly assumed within the code, and cannot be changed
35 local BOSH_DEFAULT_INACTIVITY = module:get_option_number("bosh_max_inactivity", 60); 35 local BOSH_HOLD = 1;
36 local BOSH_DEFAULT_POLLING = module:get_option_number("bosh_max_polling", 5); 36 local BOSH_MAX_REQUESTS = 2;
37 local BOSH_DEFAULT_REQUESTS = module:get_option_number("bosh_max_requests", 2); 37
38 -- The number of seconds a BOSH session should remain open with no requests
39 local bosh_max_inactivity = module:get_option_number("bosh_max_inactivity", 60);
40 -- The minimum amount of time between requests with no payload
41 local bosh_max_polling = module:get_option_number("bosh_max_polling", 5);
42 -- The maximum amount of time that the server will hold onto a request before replying
43 -- (the client can set this to a lower value when it connects, if it chooses)
38 local bosh_max_wait = module:get_option_number("bosh_max_wait", 120); 44 local bosh_max_wait = module:get_option_number("bosh_max_wait", 120);
39 45
40 local consider_bosh_secure = module:get_option_boolean("consider_bosh_secure"); 46 local consider_bosh_secure = module:get_option_boolean("consider_bosh_secure");
41 local cross_domain = module:get_option("cross_domain_bosh", false); 47 local cross_domain = module:get_option("cross_domain_bosh", false);
42 48
165 session.bosh_wait_timer:stop(); 171 session.bosh_wait_timer:stop();
166 session.bosh_wait_timer = nil; 172 session.bosh_wait_timer = nil;
167 end 173 end
168 174
169 local r = session.requests; 175 local r = session.requests;
170 log("debug", "Session %s has %d out of %d requests open", context.sid, #r, session.bosh_hold); 176 log("debug", "Session %s has %d out of %d requests open", context.sid, #r, BOSH_HOLD);
171 log("debug", "and there are %d things in the send_buffer:", #session.send_buffer); 177 log("debug", "and there are %d things in the send_buffer:", #session.send_buffer);
172 if #r > session.bosh_hold then 178 if #r > BOSH_HOLD then
173 -- We are holding too many requests, send what's in the buffer, 179 -- We are holding too many requests, send what's in the buffer,
174 log("debug", "We are holding too many requests, so..."); 180 log("debug", "We are holding too many requests, so...");
175 if #session.send_buffer > 0 then 181 if #session.send_buffer > 0 then
176 log("debug", "...sending what is in the buffer") 182 log("debug", "...sending what is in the buffer")
177 session.send(t_concat(session.send_buffer)); 183 session.send(t_concat(session.send_buffer));
301 -- New session 307 -- New session
302 sid = new_uuid(); 308 sid = new_uuid();
303 local session = { 309 local session = {
304 type = "c2s_unauthed", conn = {}, sid = sid, rid = rid, host = attr.to, 310 type = "c2s_unauthed", conn = {}, sid = sid, rid = rid, host = attr.to,
305 bosh_version = attr.ver, bosh_wait = wait, streamid = sid, 311 bosh_version = attr.ver, bosh_wait = wait, streamid = sid,
306 bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY, 312 bosh_max_inactive = bosh_max_inactivity,
307 requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream, 313 requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream,
308 close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true, 314 close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true,
309 log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure, 315 log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure,
310 ip = get_ip_from_request(request); 316 ip = get_ip_from_request(request);
311 }; 317 };
345 type = session.bosh_terminate and "terminate" or nil; 351 type = session.bosh_terminate and "terminate" or nil;
346 sid = sid; 352 sid = sid;
347 }; 353 };
348 if creating_session then 354 if creating_session then
349 creating_session = nil; 355 creating_session = nil;
350 body_attr.inactivity = tostring(BOSH_DEFAULT_INACTIVITY); 356 body_attr.requests = tostring(BOSH_MAX_REQUESTS);
351 body_attr.polling = tostring(BOSH_DEFAULT_POLLING); 357 body_attr.hold = tostring(BOSH_HOLD);
352 body_attr.requests = tostring(BOSH_DEFAULT_REQUESTS); 358 body_attr.inactivity = tostring(bosh_max_inactivity);
359 body_attr.polling = tostring(bosh_max_polling);
353 body_attr.wait = tostring(session.bosh_wait); 360 body_attr.wait = tostring(session.bosh_wait);
354 body_attr.hold = tostring(session.bosh_hold);
355 body_attr.authid = sid; 361 body_attr.authid = sid;
356 body_attr.secure = "true"; 362 body_attr.secure = "true";
357 body_attr.ver = '1.6'; 363 body_attr.ver = '1.6';
358 body_attr.from = session.host; 364 body_attr.from = session.host;
359 body_attr["xmlns:xmpp"] = "urn:xmpp:xbosh"; 365 body_attr["xmlns:xmpp"] = "urn:xmpp:xbosh";
377 end 383 end
378 384
379 if session.rid then 385 if session.rid then
380 local rid = tonumber(attr.rid); 386 local rid = tonumber(attr.rid);
381 local diff = rid - session.rid; 387 local diff = rid - session.rid;
382 if diff > 1 then 388 -- Diff should be 1 for a healthy request
383 session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid); 389 if diff ~= 1 then
384 elseif diff <= 0 then 390 context.sid = sid;
385 -- Repeated, ignore
386 session.log("debug", "rid repeated, ignoring: %s (diff %d)", session.rid, diff);
387 context.notopen = nil; 391 context.notopen = nil;
392 if diff == 2 then
393 -- Hold request, but don't process it (ouch!)
394 session.log("debug", "rid skipped: %d, deferring this request", rid-1)
395 context.defer = true;
396 session.bosh_deferred = { context = context, sid = sid, rid = rid, terminate = attr.type == "terminate" };
397 return;
398 end
388 context.ignore = true; 399 context.ignore = true;
389 context.sid = sid; 400 if diff == 0 then
390 t_insert(session.requests, response); 401 -- Re-send previous response, ignore stanzas in this request
402 session.log("debug", "rid repeated, ignoring: %s (diff %d)", session.rid, diff);
403 response:send(session.bosh_last_response);
404 return;
405 end
406 -- Session broken, destroy it
407 session.log("debug", "rid out of range: %d (diff %d)", rid, diff);
408 response:send(tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" })));
391 return; 409 return;
392 end 410 end
393 session.rid = rid; 411 session.rid = rid;
394 end 412 end
395 413
424 local session = sessions[context.sid]; 442 local session = sessions[context.sid];
425 if session then 443 if session then
426 if stanza.attr.xmlns == xmlns_bosh then 444 if stanza.attr.xmlns == xmlns_bosh then
427 stanza.attr.xmlns = nil; 445 stanza.attr.xmlns = nil;
428 end 446 end
429 stanza = session.filter("stanzas/in", stanza); 447 if context.defer and session.bosh_deferred then
430 session.thread:run(stanza); 448 log("debug", "Deferring this stanza");
449 t_insert(session.bosh_deferred, stanza);
450 else
451 stanza = session.filter("stanzas/in", stanza);
452 session.thread:run(stanza);
453 end
454 else
455 log("debug", "No session for this stanza! (sid: %s)", context.sid or "none!");
431 end 456 end
432 end 457 end
433 458
434 function stream_callbacks.streamclosed(context) 459 function stream_callbacks.streamclosed(context)
435 local session = sessions[context.sid]; 460 local session = sessions[context.sid];
436 if session then 461 if session then
462 if not context.defer and session.bosh_deferred then
463 -- Handle deferred stanzas now
464 local deferred_stanzas = session.bosh_deferred;
465 local context = deferred_stanzas.context;
466 session.bosh_deferred = nil;
467 log("debug", "Handling deferred stanzas from rid %d", deferred_stanzas.rid);
468 session.rid = deferred_stanzas.rid;
469 t_insert(session.requests, context.response);
470 for _, stanza in ipairs(deferred_stanzas) do
471 stream_callbacks.handlestanza(context, stanza);
472 end
473 if deferred_stanzas.terminate then
474 session.bosh_terminate = true;
475 end
476 end
437 session.bosh_processing = false; 477 session.bosh_processing = false;
438 if #session.send_buffer > 0 then 478 if #session.send_buffer > 0 then
439 session.send(""); 479 session.send("");
440 end 480 end
441 end 481 end