Comparison

plugins/mod_bosh.lua @ 5046:16c7b510694b

mod_bosh: Correctly handle data included in the session initiation request, and cork session while a request is being processed, preventing replying to requests when there may be more data to come, reducing round-trips.
author Matthew Wild <mwild1@gmail.com>
date Sun, 29 Jul 2012 01:56:45 +0100
parent 5031:28d56268a72d
child 5070:4bf6bd22ad11
comparison
equal deleted inserted replaced
5045:4ba6940deed0 5046:16c7b510694b
240 end 240 end
241 241
242 -- New session 242 -- New session
243 sid = new_uuid(); 243 sid = new_uuid();
244 local session = { 244 local session = {
245 type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid), host = attr.to, 245 type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid)-1, host = attr.to,
246 bosh_version = attr.ver, bosh_wait = attr.wait, streamid = sid, 246 bosh_version = attr.ver, bosh_wait = attr.wait, streamid = sid,
247 bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY, 247 bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY,
248 requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream, 248 requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream,
249 close = bosh_close_stream, dispatch_stanza = core_process_stanza, 249 close = bosh_close_stream, dispatch_stanza = core_process_stanza,
250 log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure, 250 log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure,
252 }; 252 };
253 sessions[sid] = session; 253 sessions[sid] = session;
254 254
255 session.log("debug", "BOSH session created for request from %s", session.ip); 255 session.log("debug", "BOSH session created for request from %s", session.ip);
256 log("info", "New BOSH session, assigned it sid '%s'", sid); 256 log("info", "New BOSH session, assigned it sid '%s'", sid);
257
258 -- Send creation response
259 local creating_session = true;
260 local features = st.stanza("stream:features");
261 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
262 fire_event("stream-features", session, features);
263 table.insert(session.send_buffer, tostring(features));
264
257 local r = session.requests; 265 local r = session.requests;
258 function session.send(s) 266 function session.send(s)
259 -- We need to ensure that outgoing stanzas have the jabber:client xmlns 267 -- We need to ensure that outgoing stanzas have the jabber:client xmlns
260 if s.attr and not s.attr.xmlns then 268 if s.attr and not s.attr.xmlns then
261 s = st.clone(s); 269 s = st.clone(s);
262 s.attr.xmlns = "jabber:client"; 270 s.attr.xmlns = "jabber:client";
263 end 271 end
264 --log("debug", "Sending BOSH data: %s", tostring(s)); 272 --log("debug", "Sending BOSH data: %s", tostring(s));
273 t_insert(session.send_buffer, tostring(s));
274
265 local oldest_request = r[1]; 275 local oldest_request = r[1];
266 if oldest_request then 276 if oldest_request and not session.bosh_processing then
267 log("debug", "We have an open request, so sending on that"); 277 log("debug", "We have an open request, so sending on that");
268 oldest_request.headers = default_headers; 278 oldest_request.headers = default_headers;
269 oldest_request:send(t_concat({ 279 local body_attr = { xmlns = "http://jabber.org/protocol/httpbind",
270 "<body xmlns='http://jabber.org/protocol/httpbind' ", 280 ["xmlns:stream"] = "http://etherx.jabber.org/streams";
271 session.bosh_terminate and "type='terminate' " or "", 281 type = session.bosh_terminate and "terminate" or nil;
272 "sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>", 282 sid = sid;
273 tostring(s), 283 };
274 "</body>" 284 if creating_session then
275 })); 285 body_attr.wait = attr.wait;
276 elseif s ~= "" then 286 body_attr.inactivity = tostring(BOSH_DEFAULT_INACTIVITY);
277 log("debug", "Saved to send buffer because there are %d open requests", #r); 287 body_attr.polling = tostring(BOSH_DEFAULT_POLLING);
278 -- Hmm, no requests are open :( 288 body_attr.requests = tostring(BOSH_DEFAULT_REQUESTS);
279 t_insert(session.send_buffer, tostring(s)); 289 body_attr.hold = tostring(session.bosh_hold);
280 log("debug", "There are now %d things in the send_buffer", #session.send_buffer); 290 body_attr.authid = sid;
291 body_attr.secure = "true";
292 body_attr.ver = '1.6'; from = session.host;
293 body_attr["xmlns:xmpp"] = "urn:xmpp:xbosh";
294 body_attr["xmpp:version"] = "1.0";
295 end
296 oldest_request:send(st.stanza("body", body_attr):top_tag()..t_concat(session.send_buffer).."</body>");
297 session.send_buffer = {};
281 end 298 end
282 return true; 299 return true;
283 end 300 end
284
285 -- Send creation response
286
287 local features = st.stanza("stream:features");
288 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
289 fire_event("stream-features", session, features);
290 --xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh'
291 local body = st.stanza("body", { xmlns = xmlns_bosh,
292 wait = attr.wait,
293 inactivity = tostring(BOSH_DEFAULT_INACTIVITY),
294 polling = tostring(BOSH_DEFAULT_POLLING),
295 requests = tostring(BOSH_DEFAULT_REQUESTS),
296 hold = tostring(session.bosh_hold),
297 sid = sid, authid = sid,
298 ver = '1.6', from = session.host,
299 secure = 'true', ["xmpp:version"] = "1.0",
300 ["xmlns:xmpp"] = "urn:xmpp:xbosh",
301 ["xmlns:stream"] = "http://etherx.jabber.org/streams"
302 }):add_child(features);
303 response.headers = default_headers;
304 response:send(tostring(body));
305
306 request.sid = sid; 301 request.sid = sid;
307 return; 302 return;
308 end 303 end
309 304
310 local session = sessions[sid]; 305 local session = sessions[sid];
341 end 336 end
342 337
343 context.notopen = nil; -- Signals that we accept this opening tag 338 context.notopen = nil; -- Signals that we accept this opening tag
344 t_insert(session.requests, response); 339 t_insert(session.requests, response);
345 context.sid = sid; 340 context.sid = sid;
341 session.bosh_processing = true; -- Used to suppress replies until processing of this request is done
346 342
347 if session.notopen then 343 if session.notopen then
348 local features = st.stanza("stream:features"); 344 local features = st.stanza("stream:features");
349 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); 345 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
350 fire_event("stream-features", session, features); 346 fire_event("stream-features", session, features);
351 session.send(features); 347 table.insert(session.send_buffer, tostring(features));
352 session.notopen = nil; 348 session.notopen = nil;
353 end 349 end
354 end 350 end
355 351
356 function stream_callbacks.handlestanza(context, stanza) 352 function stream_callbacks.handlestanza(context, stanza)
360 if session then 356 if session then
361 if stanza.attr.xmlns == xmlns_bosh then 357 if stanza.attr.xmlns == xmlns_bosh then
362 stanza.attr.xmlns = nil; 358 stanza.attr.xmlns = nil;
363 end 359 end
364 core_process_stanza(session, stanza); 360 core_process_stanza(session, stanza);
361 end
362 end
363
364 function stream_callbacks.streamclosed(request)
365 local session = sessions[request.sid];
366 if session then
367 session.bosh_processing = false;
368 if #session.send_buffer > 0 then
369 session.send("");
370 end
365 end 371 end
366 end 372 end
367 373
368 function stream_callbacks.error(context, error) 374 function stream_callbacks.error(context, error)
369 log("debug", "Error parsing BOSH request payload; %s", error); 375 log("debug", "Error parsing BOSH request payload; %s", error);