Software /
code /
prosody
Comparison
plugins/mod_bosh.lua @ 4692:8e7c683d78ca
mod_bosh: Large commit to update to mod_http/net.http.server APIs. Becomes a shared module.
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Wed, 25 Apr 2012 23:10:32 +0100 |
parent | 4690:55f690fdc915 |
child | 4697:778eb9405a98 |
comparison
equal
deleted
inserted
replaced
4691:a164fc7057ae | 4692:8e7c683d78ca |
---|---|
77 local inactive_sessions = {}; -- Sessions which have no open requests | 77 local inactive_sessions = {}; -- Sessions which have no open requests |
78 | 78 |
79 -- Used to respond to idle sessions (those with waiting requests) | 79 -- Used to respond to idle sessions (those with waiting requests) |
80 local waiting_requests = {}; | 80 local waiting_requests = {}; |
81 function on_destroy_request(request) | 81 function on_destroy_request(request) |
82 log("debug", "Request destroyed: %s", tostring(request)); | |
82 waiting_requests[request] = nil; | 83 waiting_requests[request] = nil; |
83 local session = sessions[request.sid]; | 84 local session = sessions[request.context.sid]; |
84 if session then | 85 if session then |
85 local requests = session.requests; | 86 local requests = session.requests; |
86 for i,r in ipairs(requests) do | 87 for i, r in ipairs(requests) do |
87 if r == request then | 88 if r == request then |
88 t_remove(requests, i); | 89 t_remove(requests, i); |
89 break; | 90 break; |
90 end | 91 end |
91 end | 92 end |
97 (session.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive); | 98 (session.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive); |
98 end | 99 end |
99 end | 100 end |
100 end | 101 end |
101 | 102 |
102 function handle_request(method, body, request) | 103 local function handle_GET(request) |
103 if (not body) or request.method ~= "POST" then | 104 return "<html><body>You really don't look like a BOSH client to me... what do you want?</body></html>"; |
104 if request.method == "OPTIONS" then | 105 end |
105 local headers = {}; | 106 |
106 for k,v in pairs(default_headers) do headers[k] = v; end | 107 function handle_OPTIONS(request) |
107 headers["Content-Type"] = nil; | 108 local headers = {}; |
108 return { headers = headers, body = "" }; | 109 for k,v in pairs(default_headers) do headers[k] = v; end |
109 else | 110 headers["Content-Type"] = nil; |
110 return "<html><body>You really don't look like a BOSH client to me... what do you want?</body></html>"; | 111 return { headers = headers, body = "" }; |
111 end | 112 end |
112 end | 113 |
113 if not method then | 114 function handle_POST(event) |
114 log("debug", "Request %s suffered error %s", tostring(request.id), body); | 115 log("debug", "Handling new request %s: %s\n----------", tostring(event.request), tostring(event.request.body)); |
115 return; | 116 |
116 end | 117 local request, response = event.request, event.response; |
117 --log("debug", "Handling new request %s: %s\n----------", request.id, tostring(body)); | 118 response.on_destroy = on_destroy_request; |
118 request.notopen = true; | 119 local body = request.body; |
119 request.log = log; | 120 |
120 request.on_destroy = on_destroy_request; | 121 local context = { request = request, response = response, notopen = true }; |
121 | 122 local stream = new_xmpp_stream(context, stream_callbacks); |
122 local stream = new_xmpp_stream(request, stream_callbacks); | 123 response.context = context; |
123 | 124 |
124 -- stream:feed() calls the stream_callbacks, so all stanzas in | 125 -- stream:feed() calls the stream_callbacks, so all stanzas in |
125 -- the body are processed in this next line before it returns. | 126 -- the body are processed in this next line before it returns. |
126 local ok, err = stream:feed(body); | 127 -- In particular, the streamopened() stream callback is where |
127 if not ok then | 128 -- much of the session logic happens, because it's where we first |
128 log("error", "Failed to parse BOSH payload: %s", err); | 129 -- get to see the 'sid' of this request. |
129 end | 130 stream:feed(body); |
130 | 131 |
131 -- Stanzas (if any) in the request have now been processed, and | 132 -- Stanzas (if any) in the request have now been processed, and |
132 -- we take care of the high-level BOSH logic here, including | 133 -- we take care of the high-level BOSH logic here, including |
133 -- giving a response or putting the request "on hold". | 134 -- giving a response or putting the request "on hold". |
134 local session = sessions[request.sid]; | 135 local session = sessions[context.sid]; |
135 if session then | 136 if session then |
136 -- Session was marked as inactive, since we have | 137 -- Session was marked as inactive, since we have |
137 -- a request open now, unmark it | 138 -- a request open now, unmark it |
138 if inactive_sessions[session] and #session.requests > 0 then | 139 if inactive_sessions[session] and #session.requests > 0 then |
139 inactive_sessions[session] = nil; | 140 inactive_sessions[session] = nil; |
140 end | 141 end |
141 | 142 |
142 local r = session.requests; | 143 local r = session.requests; |
143 log("debug", "Session %s has %d out of %d requests open", request.sid, #r, session.bosh_hold); | 144 log("debug", "Session %s has %d out of %d requests open", context.sid, #r, session.bosh_hold); |
144 log("debug", "and there are %d things in the send_buffer", #session.send_buffer); | 145 log("debug", "and there are %d things in the send_buffer:", #session.send_buffer); |
146 for i, thing in ipairs(session.send_buffer) do | |
147 log("debug", " %s", tostring(thing)); | |
148 end | |
145 if #r > session.bosh_hold then | 149 if #r > session.bosh_hold then |
146 -- We are holding too many requests, send what's in the buffer, | 150 -- We are holding too many requests, send what's in the buffer, |
147 log("debug", "We are holding too many requests, so..."); | 151 log("debug", "We are holding too many requests, so..."); |
148 if #session.send_buffer > 0 then | 152 if #session.send_buffer > 0 then |
149 log("debug", "...sending what is in the buffer") | 153 log("debug", "...sending what is in the buffer") |
159 local resp = t_concat(session.send_buffer); | 163 local resp = t_concat(session.send_buffer); |
160 session.send_buffer = {}; | 164 session.send_buffer = {}; |
161 session.send(resp); | 165 session.send(resp); |
162 end | 166 end |
163 | 167 |
164 if not request.destroyed then | 168 if not response.finished then |
165 -- We're keeping this request open, to respond later | 169 -- We're keeping this request open, to respond later |
166 log("debug", "Have nothing to say, so leaving request unanswered for now"); | 170 log("debug", "Have nothing to say, so leaving request unanswered for now"); |
167 if session.bosh_wait then | 171 if session.bosh_wait then |
168 waiting_requests[request] = os_time() + session.bosh_wait; | 172 waiting_requests[response] = os_time() + session.bosh_wait; |
169 end | 173 end |
170 end | 174 end |
171 | 175 |
172 if session.bosh_terminate then | 176 if session.bosh_terminate then |
173 session.log("debug", "Closing session with %d requests open", #session.requests); | 177 session.log("debug", "Closing session with %d requests open", #session.requests); |
211 end | 215 end |
212 end | 216 end |
213 log("info", "Disconnecting client, <stream:error> is: %s", tostring(close_reply)); | 217 log("info", "Disconnecting client, <stream:error> is: %s", tostring(close_reply)); |
214 end | 218 end |
215 | 219 |
216 local session_close_response = { headers = default_headers, body = tostring(close_reply) }; | 220 local response_body = tostring(close_reply); |
217 | |
218 for _, held_request in ipairs(session.requests) do | 221 for _, held_request in ipairs(session.requests) do |
219 held_request:send(session_close_response); | 222 held_request.headers = default_headers; |
220 held_request:destroy(); | 223 held_request:send(response_body); |
221 end | 224 end |
222 sessions[session.sid] = nil; | 225 sessions[session.sid] = nil; |
223 inactive_sessions[session] = nil; | 226 inactive_sessions[session] = nil; |
224 sm_destroy_session(session); | 227 sm_destroy_session(session); |
225 end | 228 end |
226 | 229 |
227 -- Handle the <body> tag in the request payload. | 230 -- Handle the <body> tag in the request payload. |
228 function stream_callbacks.streamopened(request, attr) | 231 function stream_callbacks.streamopened(context, attr) |
232 local request, response = context.request, context.response; | |
229 local sid = attr.sid; | 233 local sid = attr.sid; |
230 log("debug", "BOSH body open (sid: %s)", sid or "<none>"); | 234 log("debug", "BOSH body open (sid: %s)", sid or "<none>"); |
231 if not sid then | 235 if not sid then |
232 -- New session request | 236 -- New session request |
233 request.notopen = nil; -- Signals that we accept this opening tag | 237 context.notopen = nil; -- Signals that we accept this opening tag |
234 | 238 |
235 -- TODO: Sanity checks here (rid, to, known host, etc.) | 239 -- TODO: Sanity checks here (rid, to, known host, etc.) |
236 if not hosts[attr.to] then | 240 if not hosts[attr.to] then |
237 -- Unknown host | 241 -- Unknown host |
238 log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to)); | 242 log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to)); |
239 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", | 243 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", |
240 ["xmlns:stream"] = xmlns_streams, condition = "host-unknown" }); | 244 ["xmlns:stream"] = xmlns_streams, condition = "host-unknown" }); |
241 request:send(tostring(close_reply)); | 245 response:send(tostring(close_reply)); |
242 return; | 246 return; |
243 end | 247 end |
244 | 248 |
245 -- New session | 249 -- New session |
246 sid = new_uuid(); | 250 sid = new_uuid(); |
256 sessions[sid] = session; | 260 sessions[sid] = session; |
257 | 261 |
258 session.log("debug", "BOSH session created for request from %s", session.ip); | 262 session.log("debug", "BOSH session created for request from %s", session.ip); |
259 log("info", "New BOSH session, assigned it sid '%s'", sid); | 263 log("info", "New BOSH session, assigned it sid '%s'", sid); |
260 local r, send_buffer = session.requests, session.send_buffer; | 264 local r, send_buffer = session.requests, session.send_buffer; |
261 local response = { headers = default_headers } | |
262 function session.send(s) | 265 function session.send(s) |
263 -- We need to ensure that outgoing stanzas have the jabber:client xmlns | 266 -- We need to ensure that outgoing stanzas have the jabber:client xmlns |
264 if s.attr and not s.attr.xmlns then | 267 if s.attr and not s.attr.xmlns then |
265 s = st.clone(s); | 268 s = st.clone(s); |
266 s.attr.xmlns = "jabber:client"; | 269 s.attr.xmlns = "jabber:client"; |
267 end | 270 end |
268 --log("debug", "Sending BOSH data: %s", tostring(s)); | 271 --log("debug", "Sending BOSH data: %s", tostring(s)); |
269 local oldest_request = r[1]; | 272 local oldest_request = r[1]; |
270 if oldest_request and (not(auto_cork) or waiting_requests[oldest_request]) then | 273 if oldest_request and (not(auto_cork) or waiting_requests[oldest_request]) then |
271 log("debug", "We have an open request, so sending on that"); | 274 log("debug", "We have an open request, so sending on that"); |
272 response.body = t_concat({ | 275 oldest_request.headers = default_headers; |
276 oldest_request:send(t_concat({ | |
273 "<body xmlns='http://jabber.org/protocol/httpbind' ", | 277 "<body xmlns='http://jabber.org/protocol/httpbind' ", |
274 session.bosh_terminate and "type='terminate' " or "", | 278 session.bosh_terminate and "type='terminate' " or "", |
275 "sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>", | 279 "sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>", |
276 tostring(s), | 280 tostring(s), |
277 "</body>" | 281 "</body>" |
278 }); | 282 })); |
279 oldest_request:send(response); | |
280 --log("debug", "Sent"); | |
281 if oldest_request.stayopen then | |
282 if #r>1 then | |
283 -- Move front request to back | |
284 t_insert(r, oldest_request); | |
285 t_remove(r, 1); | |
286 end | |
287 else | |
288 log("debug", "Destroying the request now..."); | |
289 oldest_request:destroy(); | |
290 end | |
291 elseif s ~= "" then | 283 elseif s ~= "" then |
292 log("debug", "Saved to send buffer because there are %d open requests", #r); | 284 log("debug", "Saved to send buffer because there are %d open requests", #r); |
293 -- Hmm, no requests are open :( | 285 -- Hmm, no requests are open :( |
294 t_insert(session.send_buffer, tostring(s)); | 286 t_insert(session.send_buffer, tostring(s)); |
295 log("debug", "There are now %d things in the send_buffer", #session.send_buffer); | 287 log("debug", "There are now %d things in the send_buffer", #session.send_buffer); |
301 | 293 |
302 local features = st.stanza("stream:features"); | 294 local features = st.stanza("stream:features"); |
303 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); | 295 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); |
304 fire_event("stream-features", session, features); | 296 fire_event("stream-features", session, features); |
305 --xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh' | 297 --xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh' |
306 local response = st.stanza("body", { xmlns = xmlns_bosh, | 298 local body = st.stanza("body", { xmlns = xmlns_bosh, |
307 wait = attr.wait, | 299 wait = attr.wait, |
308 inactivity = tostring(BOSH_DEFAULT_INACTIVITY), | 300 inactivity = tostring(BOSH_DEFAULT_INACTIVITY), |
309 polling = tostring(BOSH_DEFAULT_POLLING), | 301 polling = tostring(BOSH_DEFAULT_POLLING), |
310 requests = tostring(BOSH_DEFAULT_REQUESTS), | 302 requests = tostring(BOSH_DEFAULT_REQUESTS), |
311 hold = tostring(session.bosh_hold), | 303 hold = tostring(session.bosh_hold), |
313 ver = '1.6', from = session.host, | 305 ver = '1.6', from = session.host, |
314 secure = 'true', ["xmpp:version"] = "1.0", | 306 secure = 'true', ["xmpp:version"] = "1.0", |
315 ["xmlns:xmpp"] = "urn:xmpp:xbosh", | 307 ["xmlns:xmpp"] = "urn:xmpp:xbosh", |
316 ["xmlns:stream"] = "http://etherx.jabber.org/streams" | 308 ["xmlns:stream"] = "http://etherx.jabber.org/streams" |
317 }):add_child(features); | 309 }):add_child(features); |
318 request:send{ headers = default_headers, body = tostring(response) }; | 310 response.headers = default_headers; |
311 response:send(tostring(body)); | |
319 | 312 |
320 request.sid = sid; | 313 request.sid = sid; |
321 return; | 314 return; |
322 end | 315 end |
323 | 316 |
324 local session = sessions[sid]; | 317 local session = sessions[sid]; |
325 if not session then | 318 if not session then |
326 -- Unknown sid | 319 -- Unknown sid |
327 log("info", "Client tried to use sid '%s' which we don't know about", sid); | 320 log("info", "Client tried to use sid '%s' which we don't know about", sid); |
328 request:send{ headers = default_headers, body = tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" })) }; | 321 response.headers = default_headers; |
329 request.notopen = nil; | 322 response:send(tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" }))); |
323 context.notopen = nil; | |
330 return; | 324 return; |
331 end | 325 end |
332 | 326 |
333 if session.rid then | 327 if session.rid then |
334 local rid = tonumber(attr.rid); | 328 local rid = tonumber(attr.rid); |
336 if diff > 1 then | 330 if diff > 1 then |
337 session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid); | 331 session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid); |
338 elseif diff <= 0 then | 332 elseif diff <= 0 then |
339 -- Repeated, ignore | 333 -- Repeated, ignore |
340 session.log("debug", "rid repeated (on request %s), ignoring: %s (diff %d)", request.id, session.rid, diff); | 334 session.log("debug", "rid repeated (on request %s), ignoring: %s (diff %d)", request.id, session.rid, diff); |
341 request.notopen = nil; | 335 context.notopen = nil; |
342 request.ignore = true; | 336 context.ignore = true; |
343 request.sid = sid; | 337 context.sid = sid; |
344 t_insert(session.requests, request); | 338 t_insert(session.requests, response); |
345 return; | 339 return; |
346 end | 340 end |
347 session.rid = rid; | 341 session.rid = rid; |
348 end | 342 end |
349 | 343 |
351 -- Client wants to end this session, which we'll do | 345 -- Client wants to end this session, which we'll do |
352 -- after processing any stanzas in this request | 346 -- after processing any stanzas in this request |
353 session.bosh_terminate = true; | 347 session.bosh_terminate = true; |
354 end | 348 end |
355 | 349 |
356 request.notopen = nil; -- Signals that we accept this opening tag | 350 context.notopen = nil; -- Signals that we accept this opening tag |
357 t_insert(session.requests, request); | 351 t_insert(session.requests, response); |
358 request.sid = sid; | 352 context.sid = sid; |
359 | 353 |
360 if session.notopen then | 354 if session.notopen then |
361 local features = st.stanza("stream:features"); | 355 local features = st.stanza("stream:features"); |
362 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); | 356 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); |
363 fire_event("stream-features", session, features); | 357 fire_event("stream-features", session, features); |
364 session.send(features); | 358 session.send(features); |
365 session.notopen = nil; | 359 session.notopen = nil; |
366 end | 360 end |
367 end | 361 end |
368 | 362 |
369 function stream_callbacks.handlestanza(request, stanza) | 363 function stream_callbacks.handlestanza(context, stanza) |
370 if request.ignore then return; end | 364 if context.ignore then return; end |
371 log("debug", "BOSH stanza received: %s\n", stanza:top_tag()); | 365 log("debug", "BOSH stanza received: %s\n", stanza:top_tag()); |
372 local session = sessions[request.sid]; | 366 local session = sessions[context.sid]; |
373 if session then | 367 if session then |
374 if stanza.attr.xmlns == xmlns_bosh then | 368 if stanza.attr.xmlns == xmlns_bosh then |
375 stanza.attr.xmlns = nil; | 369 stanza.attr.xmlns = nil; |
376 end | 370 end |
377 core_process_stanza(session, stanza); | 371 core_process_stanza(session, stanza); |
378 end | 372 end |
379 end | 373 end |
380 | 374 |
381 function stream_callbacks.error(request, error) | 375 function stream_callbacks.error(context, error) |
382 log("debug", "Error parsing BOSH request payload; %s", error); | 376 log("debug", "Error parsing BOSH request payload; %s", error); |
383 if not request.sid then | 377 if not context.sid then |
384 request:send({ headers = default_headers, status = "400 Bad Request" }); | 378 local response = context.response; |
379 response.headers = default_headers; | |
380 response.status_code = 400; | |
381 request:send(); | |
385 return; | 382 return; |
386 end | 383 end |
387 | 384 |
388 local session = sessions[request.sid]; | 385 local session = sessions[context.sid]; |
389 if error == "stream-error" then -- Remote stream error, we close normally | 386 if error == "stream-error" then -- Remote stream error, we close normally |
390 session:close(); | 387 session:close(); |
391 else | 388 else |
392 session:close({ condition = "bad-format", text = "Error processing stream" }); | 389 session:close({ condition = "bad-format", text = "Error processing stream" }); |
393 end | 390 end |
398 -- log("debug", "Checking for requests soon to timeout..."); | 395 -- log("debug", "Checking for requests soon to timeout..."); |
399 -- Identify requests timing out within the next few seconds | 396 -- Identify requests timing out within the next few seconds |
400 local now = os_time() + 3; | 397 local now = os_time() + 3; |
401 for request, reply_before in pairs(waiting_requests) do | 398 for request, reply_before in pairs(waiting_requests) do |
402 if reply_before <= now then | 399 if reply_before <= now then |
403 log("debug", "%s was soon to timeout, sending empty response", request.id); | 400 log("debug", "%s was soon to timeout (at %d, now %d), sending empty response", tostring(request), reply_before, now); |
404 -- Send empty response to let the | 401 -- Send empty response to let the |
405 -- client know we're still here | 402 -- client know we're still here |
406 if request.conn then | 403 if request.conn then |
407 sessions[request.sid].send(""); | 404 sessions[request.context.sid].send(""); |
408 end | 405 end |
409 end | 406 end |
410 end | 407 end |
411 | 408 |
412 now = now - 3; | 409 now = now - 3; |
426 dead_sessions[i] = nil; | 423 dead_sessions[i] = nil; |
427 sm_destroy_session(session, "BOSH client silent for over "..session.bosh_max_inactive.." seconds"); | 424 sm_destroy_session(session, "BOSH client silent for over "..session.bosh_max_inactive.." seconds"); |
428 end | 425 end |
429 return 1; | 426 return 1; |
430 end | 427 end |
431 | 428 module:add_timer(1, on_timer); |
432 | 429 |
433 local function setup() | 430 function module.add_host(module) |
434 local ports = module:get_option_array("bosh_ports") or { 5280 }; | 431 module:depends("http"); |
435 httpserver.new_from_config(ports, handle_request, { base = "http-bind" }); | 432 module:provides("http", { |
436 timer.add_task(1, on_timer); | 433 default_path = "/http-bind"; |
437 end | 434 route = { |
438 if prosody.start_time then -- already started | 435 ["GET /"] = handle_GET; |
439 setup(); | 436 ["OPTIONS /"] = handle_OPTIONS; |
440 else | 437 ["POST /"] = handle_POST; |
441 prosody.events.add_handler("server-started", setup); | 438 }; |
442 end | 439 }); |
440 end |