Software / code / prosody
Comparison
plugins/mod_bosh.lua @ 5046:16c7b510694b
mod_bosh: Correctly handle data included in the session initiation request, and cork session while a request is being processed, preventing replying to requests when there may be more data to come, reducing round-trips.
| author | Matthew Wild <mwild1@gmail.com> |
|---|---|
| date | Sun, 29 Jul 2012 01:56:45 +0100 |
| parent | 5031:28d56268a72d |
| child | 5070:4bf6bd22ad11 |
comparison
equal
deleted
inserted
replaced
| 5045:4ba6940deed0 | 5046:16c7b510694b |
|---|---|
| 240 end | 240 end |
| 241 | 241 |
| 242 -- New session | 242 -- New session |
| 243 sid = new_uuid(); | 243 sid = new_uuid(); |
| 244 local session = { | 244 local session = { |
| 245 type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid), host = attr.to, | 245 type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid)-1, host = attr.to, |
| 246 bosh_version = attr.ver, bosh_wait = attr.wait, streamid = sid, | 246 bosh_version = attr.ver, bosh_wait = attr.wait, streamid = sid, |
| 247 bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY, | 247 bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY, |
| 248 requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream, | 248 requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream, |
| 249 close = bosh_close_stream, dispatch_stanza = core_process_stanza, | 249 close = bosh_close_stream, dispatch_stanza = core_process_stanza, |
| 250 log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure, | 250 log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure, |
| 252 }; | 252 }; |
| 253 sessions[sid] = session; | 253 sessions[sid] = session; |
| 254 | 254 |
| 255 session.log("debug", "BOSH session created for request from %s", session.ip); | 255 session.log("debug", "BOSH session created for request from %s", session.ip); |
| 256 log("info", "New BOSH session, assigned it sid '%s'", sid); | 256 log("info", "New BOSH session, assigned it sid '%s'", sid); |
| 257 | |
| 258 -- Send creation response | |
| 259 local creating_session = true; | |
| 260 local features = st.stanza("stream:features"); | |
| 261 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); | |
| 262 fire_event("stream-features", session, features); | |
| 263 table.insert(session.send_buffer, tostring(features)); | |
| 264 | |
| 257 local r = session.requests; | 265 local r = session.requests; |
| 258 function session.send(s) | 266 function session.send(s) |
| 259 -- We need to ensure that outgoing stanzas have the jabber:client xmlns | 267 -- We need to ensure that outgoing stanzas have the jabber:client xmlns |
| 260 if s.attr and not s.attr.xmlns then | 268 if s.attr and not s.attr.xmlns then |
| 261 s = st.clone(s); | 269 s = st.clone(s); |
| 262 s.attr.xmlns = "jabber:client"; | 270 s.attr.xmlns = "jabber:client"; |
| 263 end | 271 end |
| 264 --log("debug", "Sending BOSH data: %s", tostring(s)); | 272 --log("debug", "Sending BOSH data: %s", tostring(s)); |
| 273 t_insert(session.send_buffer, tostring(s)); | |
| 274 | |
| 265 local oldest_request = r[1]; | 275 local oldest_request = r[1]; |
| 266 if oldest_request then | 276 if oldest_request and not session.bosh_processing then |
| 267 log("debug", "We have an open request, so sending on that"); | 277 log("debug", "We have an open request, so sending on that"); |
| 268 oldest_request.headers = default_headers; | 278 oldest_request.headers = default_headers; |
| 269 oldest_request:send(t_concat({ | 279 local body_attr = { xmlns = "http://jabber.org/protocol/httpbind", |
| 270 "<body xmlns='http://jabber.org/protocol/httpbind' ", | 280 ["xmlns:stream"] = "http://etherx.jabber.org/streams"; |
| 271 session.bosh_terminate and "type='terminate' " or "", | 281 type = session.bosh_terminate and "terminate" or nil; |
| 272 "sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>", | 282 sid = sid; |
| 273 tostring(s), | 283 }; |
| 274 "</body>" | 284 if creating_session then |
| 275 })); | 285 body_attr.wait = attr.wait; |
| 276 elseif s ~= "" then | 286 body_attr.inactivity = tostring(BOSH_DEFAULT_INACTIVITY); |
| 277 log("debug", "Saved to send buffer because there are %d open requests", #r); | 287 body_attr.polling = tostring(BOSH_DEFAULT_POLLING); |
| 278 -- Hmm, no requests are open :( | 288 body_attr.requests = tostring(BOSH_DEFAULT_REQUESTS); |
| 279 t_insert(session.send_buffer, tostring(s)); | 289 body_attr.hold = tostring(session.bosh_hold); |
| 280 log("debug", "There are now %d things in the send_buffer", #session.send_buffer); | 290 body_attr.authid = sid; |
| 291 body_attr.secure = "true"; | |
| 292 body_attr.ver = '1.6'; from = session.host; | |
| 293 body_attr["xmlns:xmpp"] = "urn:xmpp:xbosh"; | |
| 294 body_attr["xmpp:version"] = "1.0"; | |
| 295 end | |
| 296 oldest_request:send(st.stanza("body", body_attr):top_tag()..t_concat(session.send_buffer).."</body>"); | |
| 297 session.send_buffer = {}; | |
| 281 end | 298 end |
| 282 return true; | 299 return true; |
| 283 end | 300 end |
| 284 | |
| 285 -- Send creation response | |
| 286 | |
| 287 local features = st.stanza("stream:features"); | |
| 288 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); | |
| 289 fire_event("stream-features", session, features); | |
| 290 --xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh' | |
| 291 local body = st.stanza("body", { xmlns = xmlns_bosh, | |
| 292 wait = attr.wait, | |
| 293 inactivity = tostring(BOSH_DEFAULT_INACTIVITY), | |
| 294 polling = tostring(BOSH_DEFAULT_POLLING), | |
| 295 requests = tostring(BOSH_DEFAULT_REQUESTS), | |
| 296 hold = tostring(session.bosh_hold), | |
| 297 sid = sid, authid = sid, | |
| 298 ver = '1.6', from = session.host, | |
| 299 secure = 'true', ["xmpp:version"] = "1.0", | |
| 300 ["xmlns:xmpp"] = "urn:xmpp:xbosh", | |
| 301 ["xmlns:stream"] = "http://etherx.jabber.org/streams" | |
| 302 }):add_child(features); | |
| 303 response.headers = default_headers; | |
| 304 response:send(tostring(body)); | |
| 305 | |
| 306 request.sid = sid; | 301 request.sid = sid; |
| 307 return; | 302 return; |
| 308 end | 303 end |
| 309 | 304 |
| 310 local session = sessions[sid]; | 305 local session = sessions[sid]; |
| 341 end | 336 end |
| 342 | 337 |
| 343 context.notopen = nil; -- Signals that we accept this opening tag | 338 context.notopen = nil; -- Signals that we accept this opening tag |
| 344 t_insert(session.requests, response); | 339 t_insert(session.requests, response); |
| 345 context.sid = sid; | 340 context.sid = sid; |
| 341 session.bosh_processing = true; -- Used to suppress replies until processing of this request is done | |
| 346 | 342 |
| 347 if session.notopen then | 343 if session.notopen then |
| 348 local features = st.stanza("stream:features"); | 344 local features = st.stanza("stream:features"); |
| 349 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); | 345 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); |
| 350 fire_event("stream-features", session, features); | 346 fire_event("stream-features", session, features); |
| 351 session.send(features); | 347 table.insert(session.send_buffer, tostring(features)); |
| 352 session.notopen = nil; | 348 session.notopen = nil; |
| 353 end | 349 end |
| 354 end | 350 end |
| 355 | 351 |
| 356 function stream_callbacks.handlestanza(context, stanza) | 352 function stream_callbacks.handlestanza(context, stanza) |
| 360 if session then | 356 if session then |
| 361 if stanza.attr.xmlns == xmlns_bosh then | 357 if stanza.attr.xmlns == xmlns_bosh then |
| 362 stanza.attr.xmlns = nil; | 358 stanza.attr.xmlns = nil; |
| 363 end | 359 end |
| 364 core_process_stanza(session, stanza); | 360 core_process_stanza(session, stanza); |
| 361 end | |
| 362 end | |
| 363 | |
| 364 function stream_callbacks.streamclosed(request) | |
| 365 local session = sessions[request.sid]; | |
| 366 if session then | |
| 367 session.bosh_processing = false; | |
| 368 if #session.send_buffer > 0 then | |
| 369 session.send(""); | |
| 370 end | |
| 365 end | 371 end |
| 366 end | 372 end |
| 367 | 373 |
| 368 function stream_callbacks.error(context, error) | 374 function stream_callbacks.error(context, error) |
| 369 log("debug", "Error parsing BOSH request payload; %s", error); | 375 log("debug", "Error parsing BOSH request payload; %s", error); |