Software / code / prosody
Comparison
plugins/mod_bosh.lua @ 4692:8e7c683d78ca
mod_bosh: Large commit to update to mod_http/net.http.server APIs. Becomes a shared module.
| author | Matthew Wild <mwild1@gmail.com> |
|---|---|
| date | Wed, 25 Apr 2012 23:10:32 +0100 |
| parent | 4690:55f690fdc915 |
| child | 4697:778eb9405a98 |
comparison
equal
deleted
inserted
replaced
| 4691:a164fc7057ae | 4692:8e7c683d78ca |
|---|---|
| 77 local inactive_sessions = {}; -- Sessions which have no open requests | 77 local inactive_sessions = {}; -- Sessions which have no open requests |
| 78 | 78 |
| 79 -- Used to respond to idle sessions (those with waiting requests) | 79 -- Used to respond to idle sessions (those with waiting requests) |
| 80 local waiting_requests = {}; | 80 local waiting_requests = {}; |
| 81 function on_destroy_request(request) | 81 function on_destroy_request(request) |
| 82 log("debug", "Request destroyed: %s", tostring(request)); | |
| 82 waiting_requests[request] = nil; | 83 waiting_requests[request] = nil; |
| 83 local session = sessions[request.sid]; | 84 local session = sessions[request.context.sid]; |
| 84 if session then | 85 if session then |
| 85 local requests = session.requests; | 86 local requests = session.requests; |
| 86 for i,r in ipairs(requests) do | 87 for i, r in ipairs(requests) do |
| 87 if r == request then | 88 if r == request then |
| 88 t_remove(requests, i); | 89 t_remove(requests, i); |
| 89 break; | 90 break; |
| 90 end | 91 end |
| 91 end | 92 end |
| 97 (session.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive); | 98 (session.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive); |
| 98 end | 99 end |
| 99 end | 100 end |
| 100 end | 101 end |
| 101 | 102 |
| 102 function handle_request(method, body, request) | 103 local function handle_GET(request) |
| 103 if (not body) or request.method ~= "POST" then | 104 return "<html><body>You really don't look like a BOSH client to me... what do you want?</body></html>"; |
| 104 if request.method == "OPTIONS" then | 105 end |
| 105 local headers = {}; | 106 |
| 106 for k,v in pairs(default_headers) do headers[k] = v; end | 107 function handle_OPTIONS(request) |
| 107 headers["Content-Type"] = nil; | 108 local headers = {}; |
| 108 return { headers = headers, body = "" }; | 109 for k,v in pairs(default_headers) do headers[k] = v; end |
| 109 else | 110 headers["Content-Type"] = nil; |
| 110 return "<html><body>You really don't look like a BOSH client to me... what do you want?</body></html>"; | 111 return { headers = headers, body = "" }; |
| 111 end | 112 end |
| 112 end | 113 |
| 113 if not method then | 114 function handle_POST(event) |
| 114 log("debug", "Request %s suffered error %s", tostring(request.id), body); | 115 log("debug", "Handling new request %s: %s\n----------", tostring(event.request), tostring(event.request.body)); |
| 115 return; | 116 |
| 116 end | 117 local request, response = event.request, event.response; |
| 117 --log("debug", "Handling new request %s: %s\n----------", request.id, tostring(body)); | 118 response.on_destroy = on_destroy_request; |
| 118 request.notopen = true; | 119 local body = request.body; |
| 119 request.log = log; | 120 |
| 120 request.on_destroy = on_destroy_request; | 121 local context = { request = request, response = response, notopen = true }; |
| 121 | 122 local stream = new_xmpp_stream(context, stream_callbacks); |
| 122 local stream = new_xmpp_stream(request, stream_callbacks); | 123 response.context = context; |
| 123 | 124 |
| 124 -- stream:feed() calls the stream_callbacks, so all stanzas in | 125 -- stream:feed() calls the stream_callbacks, so all stanzas in |
| 125 -- the body are processed in this next line before it returns. | 126 -- the body are processed in this next line before it returns. |
| 126 local ok, err = stream:feed(body); | 127 -- In particular, the streamopened() stream callback is where |
| 127 if not ok then | 128 -- much of the session logic happens, because it's where we first |
| 128 log("error", "Failed to parse BOSH payload: %s", err); | 129 -- get to see the 'sid' of this request. |
| 129 end | 130 stream:feed(body); |
| 130 | 131 |
| 131 -- Stanzas (if any) in the request have now been processed, and | 132 -- Stanzas (if any) in the request have now been processed, and |
| 132 -- we take care of the high-level BOSH logic here, including | 133 -- we take care of the high-level BOSH logic here, including |
| 133 -- giving a response or putting the request "on hold". | 134 -- giving a response or putting the request "on hold". |
| 134 local session = sessions[request.sid]; | 135 local session = sessions[context.sid]; |
| 135 if session then | 136 if session then |
| 136 -- Session was marked as inactive, since we have | 137 -- Session was marked as inactive, since we have |
| 137 -- a request open now, unmark it | 138 -- a request open now, unmark it |
| 138 if inactive_sessions[session] and #session.requests > 0 then | 139 if inactive_sessions[session] and #session.requests > 0 then |
| 139 inactive_sessions[session] = nil; | 140 inactive_sessions[session] = nil; |
| 140 end | 141 end |
| 141 | 142 |
| 142 local r = session.requests; | 143 local r = session.requests; |
| 143 log("debug", "Session %s has %d out of %d requests open", request.sid, #r, session.bosh_hold); | 144 log("debug", "Session %s has %d out of %d requests open", context.sid, #r, session.bosh_hold); |
| 144 log("debug", "and there are %d things in the send_buffer", #session.send_buffer); | 145 log("debug", "and there are %d things in the send_buffer:", #session.send_buffer); |
| 146 for i, thing in ipairs(session.send_buffer) do | |
| 147 log("debug", " %s", tostring(thing)); | |
| 148 end | |
| 145 if #r > session.bosh_hold then | 149 if #r > session.bosh_hold then |
| 146 -- We are holding too many requests, send what's in the buffer, | 150 -- We are holding too many requests, send what's in the buffer, |
| 147 log("debug", "We are holding too many requests, so..."); | 151 log("debug", "We are holding too many requests, so..."); |
| 148 if #session.send_buffer > 0 then | 152 if #session.send_buffer > 0 then |
| 149 log("debug", "...sending what is in the buffer") | 153 log("debug", "...sending what is in the buffer") |
| 159 local resp = t_concat(session.send_buffer); | 163 local resp = t_concat(session.send_buffer); |
| 160 session.send_buffer = {}; | 164 session.send_buffer = {}; |
| 161 session.send(resp); | 165 session.send(resp); |
| 162 end | 166 end |
| 163 | 167 |
| 164 if not request.destroyed then | 168 if not response.finished then |
| 165 -- We're keeping this request open, to respond later | 169 -- We're keeping this request open, to respond later |
| 166 log("debug", "Have nothing to say, so leaving request unanswered for now"); | 170 log("debug", "Have nothing to say, so leaving request unanswered for now"); |
| 167 if session.bosh_wait then | 171 if session.bosh_wait then |
| 168 waiting_requests[request] = os_time() + session.bosh_wait; | 172 waiting_requests[response] = os_time() + session.bosh_wait; |
| 169 end | 173 end |
| 170 end | 174 end |
| 171 | 175 |
| 172 if session.bosh_terminate then | 176 if session.bosh_terminate then |
| 173 session.log("debug", "Closing session with %d requests open", #session.requests); | 177 session.log("debug", "Closing session with %d requests open", #session.requests); |
| 211 end | 215 end |
| 212 end | 216 end |
| 213 log("info", "Disconnecting client, <stream:error> is: %s", tostring(close_reply)); | 217 log("info", "Disconnecting client, <stream:error> is: %s", tostring(close_reply)); |
| 214 end | 218 end |
| 215 | 219 |
| 216 local session_close_response = { headers = default_headers, body = tostring(close_reply) }; | 220 local response_body = tostring(close_reply); |
| 217 | |
| 218 for _, held_request in ipairs(session.requests) do | 221 for _, held_request in ipairs(session.requests) do |
| 219 held_request:send(session_close_response); | 222 held_request.headers = default_headers; |
| 220 held_request:destroy(); | 223 held_request:send(response_body); |
| 221 end | 224 end |
| 222 sessions[session.sid] = nil; | 225 sessions[session.sid] = nil; |
| 223 inactive_sessions[session] = nil; | 226 inactive_sessions[session] = nil; |
| 224 sm_destroy_session(session); | 227 sm_destroy_session(session); |
| 225 end | 228 end |
| 226 | 229 |
| 227 -- Handle the <body> tag in the request payload. | 230 -- Handle the <body> tag in the request payload. |
| 228 function stream_callbacks.streamopened(request, attr) | 231 function stream_callbacks.streamopened(context, attr) |
| 232 local request, response = context.request, context.response; | |
| 229 local sid = attr.sid; | 233 local sid = attr.sid; |
| 230 log("debug", "BOSH body open (sid: %s)", sid or "<none>"); | 234 log("debug", "BOSH body open (sid: %s)", sid or "<none>"); |
| 231 if not sid then | 235 if not sid then |
| 232 -- New session request | 236 -- New session request |
| 233 request.notopen = nil; -- Signals that we accept this opening tag | 237 context.notopen = nil; -- Signals that we accept this opening tag |
| 234 | 238 |
| 235 -- TODO: Sanity checks here (rid, to, known host, etc.) | 239 -- TODO: Sanity checks here (rid, to, known host, etc.) |
| 236 if not hosts[attr.to] then | 240 if not hosts[attr.to] then |
| 237 -- Unknown host | 241 -- Unknown host |
| 238 log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to)); | 242 log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to)); |
| 239 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", | 243 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", |
| 240 ["xmlns:stream"] = xmlns_streams, condition = "host-unknown" }); | 244 ["xmlns:stream"] = xmlns_streams, condition = "host-unknown" }); |
| 241 request:send(tostring(close_reply)); | 245 response:send(tostring(close_reply)); |
| 242 return; | 246 return; |
| 243 end | 247 end |
| 244 | 248 |
| 245 -- New session | 249 -- New session |
| 246 sid = new_uuid(); | 250 sid = new_uuid(); |
| 256 sessions[sid] = session; | 260 sessions[sid] = session; |
| 257 | 261 |
| 258 session.log("debug", "BOSH session created for request from %s", session.ip); | 262 session.log("debug", "BOSH session created for request from %s", session.ip); |
| 259 log("info", "New BOSH session, assigned it sid '%s'", sid); | 263 log("info", "New BOSH session, assigned it sid '%s'", sid); |
| 260 local r, send_buffer = session.requests, session.send_buffer; | 264 local r, send_buffer = session.requests, session.send_buffer; |
| 261 local response = { headers = default_headers } | |
| 262 function session.send(s) | 265 function session.send(s) |
| 263 -- We need to ensure that outgoing stanzas have the jabber:client xmlns | 266 -- We need to ensure that outgoing stanzas have the jabber:client xmlns |
| 264 if s.attr and not s.attr.xmlns then | 267 if s.attr and not s.attr.xmlns then |
| 265 s = st.clone(s); | 268 s = st.clone(s); |
| 266 s.attr.xmlns = "jabber:client"; | 269 s.attr.xmlns = "jabber:client"; |
| 267 end | 270 end |
| 268 --log("debug", "Sending BOSH data: %s", tostring(s)); | 271 --log("debug", "Sending BOSH data: %s", tostring(s)); |
| 269 local oldest_request = r[1]; | 272 local oldest_request = r[1]; |
| 270 if oldest_request and (not(auto_cork) or waiting_requests[oldest_request]) then | 273 if oldest_request and (not(auto_cork) or waiting_requests[oldest_request]) then |
| 271 log("debug", "We have an open request, so sending on that"); | 274 log("debug", "We have an open request, so sending on that"); |
| 272 response.body = t_concat({ | 275 oldest_request.headers = default_headers; |
| 276 oldest_request:send(t_concat({ | |
| 273 "<body xmlns='http://jabber.org/protocol/httpbind' ", | 277 "<body xmlns='http://jabber.org/protocol/httpbind' ", |
| 274 session.bosh_terminate and "type='terminate' " or "", | 278 session.bosh_terminate and "type='terminate' " or "", |
| 275 "sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>", | 279 "sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>", |
| 276 tostring(s), | 280 tostring(s), |
| 277 "</body>" | 281 "</body>" |
| 278 }); | 282 })); |
| 279 oldest_request:send(response); | |
| 280 --log("debug", "Sent"); | |
| 281 if oldest_request.stayopen then | |
| 282 if #r>1 then | |
| 283 -- Move front request to back | |
| 284 t_insert(r, oldest_request); | |
| 285 t_remove(r, 1); | |
| 286 end | |
| 287 else | |
| 288 log("debug", "Destroying the request now..."); | |
| 289 oldest_request:destroy(); | |
| 290 end | |
| 291 elseif s ~= "" then | 283 elseif s ~= "" then |
| 292 log("debug", "Saved to send buffer because there are %d open requests", #r); | 284 log("debug", "Saved to send buffer because there are %d open requests", #r); |
| 293 -- Hmm, no requests are open :( | 285 -- Hmm, no requests are open :( |
| 294 t_insert(session.send_buffer, tostring(s)); | 286 t_insert(session.send_buffer, tostring(s)); |
| 295 log("debug", "There are now %d things in the send_buffer", #session.send_buffer); | 287 log("debug", "There are now %d things in the send_buffer", #session.send_buffer); |
| 301 | 293 |
| 302 local features = st.stanza("stream:features"); | 294 local features = st.stanza("stream:features"); |
| 303 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); | 295 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); |
| 304 fire_event("stream-features", session, features); | 296 fire_event("stream-features", session, features); |
| 305 --xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh' | 297 --xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh' |
| 306 local response = st.stanza("body", { xmlns = xmlns_bosh, | 298 local body = st.stanza("body", { xmlns = xmlns_bosh, |
| 307 wait = attr.wait, | 299 wait = attr.wait, |
| 308 inactivity = tostring(BOSH_DEFAULT_INACTIVITY), | 300 inactivity = tostring(BOSH_DEFAULT_INACTIVITY), |
| 309 polling = tostring(BOSH_DEFAULT_POLLING), | 301 polling = tostring(BOSH_DEFAULT_POLLING), |
| 310 requests = tostring(BOSH_DEFAULT_REQUESTS), | 302 requests = tostring(BOSH_DEFAULT_REQUESTS), |
| 311 hold = tostring(session.bosh_hold), | 303 hold = tostring(session.bosh_hold), |
| 313 ver = '1.6', from = session.host, | 305 ver = '1.6', from = session.host, |
| 314 secure = 'true', ["xmpp:version"] = "1.0", | 306 secure = 'true', ["xmpp:version"] = "1.0", |
| 315 ["xmlns:xmpp"] = "urn:xmpp:xbosh", | 307 ["xmlns:xmpp"] = "urn:xmpp:xbosh", |
| 316 ["xmlns:stream"] = "http://etherx.jabber.org/streams" | 308 ["xmlns:stream"] = "http://etherx.jabber.org/streams" |
| 317 }):add_child(features); | 309 }):add_child(features); |
| 318 request:send{ headers = default_headers, body = tostring(response) }; | 310 response.headers = default_headers; |
| 311 response:send(tostring(body)); | |
| 319 | 312 |
| 320 request.sid = sid; | 313 request.sid = sid; |
| 321 return; | 314 return; |
| 322 end | 315 end |
| 323 | 316 |
| 324 local session = sessions[sid]; | 317 local session = sessions[sid]; |
| 325 if not session then | 318 if not session then |
| 326 -- Unknown sid | 319 -- Unknown sid |
| 327 log("info", "Client tried to use sid '%s' which we don't know about", sid); | 320 log("info", "Client tried to use sid '%s' which we don't know about", sid); |
| 328 request:send{ headers = default_headers, body = tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" })) }; | 321 response.headers = default_headers; |
| 329 request.notopen = nil; | 322 response:send(tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" }))); |
| 323 context.notopen = nil; | |
| 330 return; | 324 return; |
| 331 end | 325 end |
| 332 | 326 |
| 333 if session.rid then | 327 if session.rid then |
| 334 local rid = tonumber(attr.rid); | 328 local rid = tonumber(attr.rid); |
| 336 if diff > 1 then | 330 if diff > 1 then |
| 337 session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid); | 331 session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid); |
| 338 elseif diff <= 0 then | 332 elseif diff <= 0 then |
| 339 -- Repeated, ignore | 333 -- Repeated, ignore |
| 340 session.log("debug", "rid repeated (on request %s), ignoring: %s (diff %d)", request.id, session.rid, diff); | 334 session.log("debug", "rid repeated (on request %s), ignoring: %s (diff %d)", request.id, session.rid, diff); |
| 341 request.notopen = nil; | 335 context.notopen = nil; |
| 342 request.ignore = true; | 336 context.ignore = true; |
| 343 request.sid = sid; | 337 context.sid = sid; |
| 344 t_insert(session.requests, request); | 338 t_insert(session.requests, response); |
| 345 return; | 339 return; |
| 346 end | 340 end |
| 347 session.rid = rid; | 341 session.rid = rid; |
| 348 end | 342 end |
| 349 | 343 |
| 351 -- Client wants to end this session, which we'll do | 345 -- Client wants to end this session, which we'll do |
| 352 -- after processing any stanzas in this request | 346 -- after processing any stanzas in this request |
| 353 session.bosh_terminate = true; | 347 session.bosh_terminate = true; |
| 354 end | 348 end |
| 355 | 349 |
| 356 request.notopen = nil; -- Signals that we accept this opening tag | 350 context.notopen = nil; -- Signals that we accept this opening tag |
| 357 t_insert(session.requests, request); | 351 t_insert(session.requests, response); |
| 358 request.sid = sid; | 352 context.sid = sid; |
| 359 | 353 |
| 360 if session.notopen then | 354 if session.notopen then |
| 361 local features = st.stanza("stream:features"); | 355 local features = st.stanza("stream:features"); |
| 362 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); | 356 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); |
| 363 fire_event("stream-features", session, features); | 357 fire_event("stream-features", session, features); |
| 364 session.send(features); | 358 session.send(features); |
| 365 session.notopen = nil; | 359 session.notopen = nil; |
| 366 end | 360 end |
| 367 end | 361 end |
| 368 | 362 |
| 369 function stream_callbacks.handlestanza(request, stanza) | 363 function stream_callbacks.handlestanza(context, stanza) |
| 370 if request.ignore then return; end | 364 if context.ignore then return; end |
| 371 log("debug", "BOSH stanza received: %s\n", stanza:top_tag()); | 365 log("debug", "BOSH stanza received: %s\n", stanza:top_tag()); |
| 372 local session = sessions[request.sid]; | 366 local session = sessions[context.sid]; |
| 373 if session then | 367 if session then |
| 374 if stanza.attr.xmlns == xmlns_bosh then | 368 if stanza.attr.xmlns == xmlns_bosh then |
| 375 stanza.attr.xmlns = nil; | 369 stanza.attr.xmlns = nil; |
| 376 end | 370 end |
| 377 core_process_stanza(session, stanza); | 371 core_process_stanza(session, stanza); |
| 378 end | 372 end |
| 379 end | 373 end |
| 380 | 374 |
| 381 function stream_callbacks.error(request, error) | 375 function stream_callbacks.error(context, error) |
| 382 log("debug", "Error parsing BOSH request payload; %s", error); | 376 log("debug", "Error parsing BOSH request payload; %s", error); |
| 383 if not request.sid then | 377 if not context.sid then |
| 384 request:send({ headers = default_headers, status = "400 Bad Request" }); | 378 local response = context.response; |
| 379 response.headers = default_headers; | |
| 380 response.status_code = 400; | |
| 381 request:send(); | |
| 385 return; | 382 return; |
| 386 end | 383 end |
| 387 | 384 |
| 388 local session = sessions[request.sid]; | 385 local session = sessions[context.sid]; |
| 389 if error == "stream-error" then -- Remote stream error, we close normally | 386 if error == "stream-error" then -- Remote stream error, we close normally |
| 390 session:close(); | 387 session:close(); |
| 391 else | 388 else |
| 392 session:close({ condition = "bad-format", text = "Error processing stream" }); | 389 session:close({ condition = "bad-format", text = "Error processing stream" }); |
| 393 end | 390 end |
| 398 -- log("debug", "Checking for requests soon to timeout..."); | 395 -- log("debug", "Checking for requests soon to timeout..."); |
| 399 -- Identify requests timing out within the next few seconds | 396 -- Identify requests timing out within the next few seconds |
| 400 local now = os_time() + 3; | 397 local now = os_time() + 3; |
| 401 for request, reply_before in pairs(waiting_requests) do | 398 for request, reply_before in pairs(waiting_requests) do |
| 402 if reply_before <= now then | 399 if reply_before <= now then |
| 403 log("debug", "%s was soon to timeout, sending empty response", request.id); | 400 log("debug", "%s was soon to timeout (at %d, now %d), sending empty response", tostring(request), reply_before, now); |
| 404 -- Send empty response to let the | 401 -- Send empty response to let the |
| 405 -- client know we're still here | 402 -- client know we're still here |
| 406 if request.conn then | 403 if request.conn then |
| 407 sessions[request.sid].send(""); | 404 sessions[request.context.sid].send(""); |
| 408 end | 405 end |
| 409 end | 406 end |
| 410 end | 407 end |
| 411 | 408 |
| 412 now = now - 3; | 409 now = now - 3; |
| 426 dead_sessions[i] = nil; | 423 dead_sessions[i] = nil; |
| 427 sm_destroy_session(session, "BOSH client silent for over "..session.bosh_max_inactive.." seconds"); | 424 sm_destroy_session(session, "BOSH client silent for over "..session.bosh_max_inactive.." seconds"); |
| 428 end | 425 end |
| 429 return 1; | 426 return 1; |
| 430 end | 427 end |
| 431 | 428 module:add_timer(1, on_timer); |
| 432 | 429 |
| 433 local function setup() | 430 function module.add_host(module) |
| 434 local ports = module:get_option_array("bosh_ports") or { 5280 }; | 431 module:depends("http"); |
| 435 httpserver.new_from_config(ports, handle_request, { base = "http-bind" }); | 432 module:provides("http", { |
| 436 timer.add_task(1, on_timer); | 433 default_path = "/http-bind"; |
| 437 end | 434 route = { |
| 438 if prosody.start_time then -- already started | 435 ["GET /"] = handle_GET; |
| 439 setup(); | 436 ["OPTIONS /"] = handle_OPTIONS; |
| 440 else | 437 ["POST /"] = handle_POST; |
| 441 prosody.events.add_handler("server-started", setup); | 438 }; |
| 442 end | 439 }); |
| 440 end |