Comparison

plugins/mod_bosh.lua @ 4692:8e7c683d78ca

mod_bosh: Large commit to update to mod_http/net.http.server APIs. Becomes a shared module.
author Matthew Wild <mwild1@gmail.com>
date Wed, 25 Apr 2012 23:10:32 +0100
parent 4690:55f690fdc915
child 4697:778eb9405a98
comparison
equal deleted inserted replaced
4691:a164fc7057ae 4692:8e7c683d78ca
77 local inactive_sessions = {}; -- Sessions which have no open requests 77 local inactive_sessions = {}; -- Sessions which have no open requests
78 78
79 -- Used to respond to idle sessions (those with waiting requests) 79 -- Used to respond to idle sessions (those with waiting requests)
80 local waiting_requests = {}; 80 local waiting_requests = {};
81 function on_destroy_request(request) 81 function on_destroy_request(request)
82 log("debug", "Request destroyed: %s", tostring(request));
82 waiting_requests[request] = nil; 83 waiting_requests[request] = nil;
83 local session = sessions[request.sid]; 84 local session = sessions[request.context.sid];
84 if session then 85 if session then
85 local requests = session.requests; 86 local requests = session.requests;
86 for i,r in ipairs(requests) do 87 for i, r in ipairs(requests) do
87 if r == request then 88 if r == request then
88 t_remove(requests, i); 89 t_remove(requests, i);
89 break; 90 break;
90 end 91 end
91 end 92 end
97 (session.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive); 98 (session.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive);
98 end 99 end
99 end 100 end
100 end 101 end
101 102
102 function handle_request(method, body, request) 103 local function handle_GET(request)
103 if (not body) or request.method ~= "POST" then 104 return "<html><body>You really don't look like a BOSH client to me... what do you want?</body></html>";
104 if request.method == "OPTIONS" then 105 end
105 local headers = {}; 106
106 for k,v in pairs(default_headers) do headers[k] = v; end 107 function handle_OPTIONS(request)
107 headers["Content-Type"] = nil; 108 local headers = {};
108 return { headers = headers, body = "" }; 109 for k,v in pairs(default_headers) do headers[k] = v; end
109 else 110 headers["Content-Type"] = nil;
110 return "<html><body>You really don't look like a BOSH client to me... what do you want?</body></html>"; 111 return { headers = headers, body = "" };
111 end 112 end
112 end 113
113 if not method then 114 function handle_POST(event)
114 log("debug", "Request %s suffered error %s", tostring(request.id), body); 115 log("debug", "Handling new request %s: %s\n----------", tostring(event.request), tostring(event.request.body));
115 return; 116
116 end 117 local request, response = event.request, event.response;
117 --log("debug", "Handling new request %s: %s\n----------", request.id, tostring(body)); 118 response.on_destroy = on_destroy_request;
118 request.notopen = true; 119 local body = request.body;
119 request.log = log; 120
120 request.on_destroy = on_destroy_request; 121 local context = { request = request, response = response, notopen = true };
121 122 local stream = new_xmpp_stream(context, stream_callbacks);
122 local stream = new_xmpp_stream(request, stream_callbacks); 123 response.context = context;
123 124
124 -- stream:feed() calls the stream_callbacks, so all stanzas in 125 -- stream:feed() calls the stream_callbacks, so all stanzas in
125 -- the body are processed in this next line before it returns. 126 -- the body are processed in this next line before it returns.
126 local ok, err = stream:feed(body); 127 -- In particular, the streamopened() stream callback is where
127 if not ok then 128 -- much of the session logic happens, because it's where we first
128 log("error", "Failed to parse BOSH payload: %s", err); 129 -- get to see the 'sid' of this request.
129 end 130 stream:feed(body);
130 131
131 -- Stanzas (if any) in the request have now been processed, and 132 -- Stanzas (if any) in the request have now been processed, and
132 -- we take care of the high-level BOSH logic here, including 133 -- we take care of the high-level BOSH logic here, including
133 -- giving a response or putting the request "on hold". 134 -- giving a response or putting the request "on hold".
134 local session = sessions[request.sid]; 135 local session = sessions[context.sid];
135 if session then 136 if session then
136 -- Session was marked as inactive, since we have 137 -- Session was marked as inactive, since we have
137 -- a request open now, unmark it 138 -- a request open now, unmark it
138 if inactive_sessions[session] and #session.requests > 0 then 139 if inactive_sessions[session] and #session.requests > 0 then
139 inactive_sessions[session] = nil; 140 inactive_sessions[session] = nil;
140 end 141 end
141 142
142 local r = session.requests; 143 local r = session.requests;
143 log("debug", "Session %s has %d out of %d requests open", request.sid, #r, session.bosh_hold); 144 log("debug", "Session %s has %d out of %d requests open", context.sid, #r, session.bosh_hold);
144 log("debug", "and there are %d things in the send_buffer", #session.send_buffer); 145 log("debug", "and there are %d things in the send_buffer:", #session.send_buffer);
146 for i, thing in ipairs(session.send_buffer) do
147 log("debug", " %s", tostring(thing));
148 end
145 if #r > session.bosh_hold then 149 if #r > session.bosh_hold then
146 -- We are holding too many requests, send what's in the buffer, 150 -- We are holding too many requests, send what's in the buffer,
147 log("debug", "We are holding too many requests, so..."); 151 log("debug", "We are holding too many requests, so...");
148 if #session.send_buffer > 0 then 152 if #session.send_buffer > 0 then
149 log("debug", "...sending what is in the buffer") 153 log("debug", "...sending what is in the buffer")
159 local resp = t_concat(session.send_buffer); 163 local resp = t_concat(session.send_buffer);
160 session.send_buffer = {}; 164 session.send_buffer = {};
161 session.send(resp); 165 session.send(resp);
162 end 166 end
163 167
164 if not request.destroyed then 168 if not response.finished then
165 -- We're keeping this request open, to respond later 169 -- We're keeping this request open, to respond later
166 log("debug", "Have nothing to say, so leaving request unanswered for now"); 170 log("debug", "Have nothing to say, so leaving request unanswered for now");
167 if session.bosh_wait then 171 if session.bosh_wait then
168 waiting_requests[request] = os_time() + session.bosh_wait; 172 waiting_requests[response] = os_time() + session.bosh_wait;
169 end 173 end
170 end 174 end
171 175
172 if session.bosh_terminate then 176 if session.bosh_terminate then
173 session.log("debug", "Closing session with %d requests open", #session.requests); 177 session.log("debug", "Closing session with %d requests open", #session.requests);
211 end 215 end
212 end 216 end
213 log("info", "Disconnecting client, <stream:error> is: %s", tostring(close_reply)); 217 log("info", "Disconnecting client, <stream:error> is: %s", tostring(close_reply));
214 end 218 end
215 219
216 local session_close_response = { headers = default_headers, body = tostring(close_reply) }; 220 local response_body = tostring(close_reply);
217
218 for _, held_request in ipairs(session.requests) do 221 for _, held_request in ipairs(session.requests) do
219 held_request:send(session_close_response); 222 held_request.headers = default_headers;
220 held_request:destroy(); 223 held_request:send(response_body);
221 end 224 end
222 sessions[session.sid] = nil; 225 sessions[session.sid] = nil;
223 inactive_sessions[session] = nil; 226 inactive_sessions[session] = nil;
224 sm_destroy_session(session); 227 sm_destroy_session(session);
225 end 228 end
226 229
227 -- Handle the <body> tag in the request payload. 230 -- Handle the <body> tag in the request payload.
228 function stream_callbacks.streamopened(request, attr) 231 function stream_callbacks.streamopened(context, attr)
232 local request, response = context.request, context.response;
229 local sid = attr.sid; 233 local sid = attr.sid;
230 log("debug", "BOSH body open (sid: %s)", sid or "<none>"); 234 log("debug", "BOSH body open (sid: %s)", sid or "<none>");
231 if not sid then 235 if not sid then
232 -- New session request 236 -- New session request
233 request.notopen = nil; -- Signals that we accept this opening tag 237 context.notopen = nil; -- Signals that we accept this opening tag
234 238
235 -- TODO: Sanity checks here (rid, to, known host, etc.) 239 -- TODO: Sanity checks here (rid, to, known host, etc.)
236 if not hosts[attr.to] then 240 if not hosts[attr.to] then
237 -- Unknown host 241 -- Unknown host
238 log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to)); 242 log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to));
239 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", 243 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
240 ["xmlns:stream"] = xmlns_streams, condition = "host-unknown" }); 244 ["xmlns:stream"] = xmlns_streams, condition = "host-unknown" });
241 request:send(tostring(close_reply)); 245 response:send(tostring(close_reply));
242 return; 246 return;
243 end 247 end
244 248
245 -- New session 249 -- New session
246 sid = new_uuid(); 250 sid = new_uuid();
256 sessions[sid] = session; 260 sessions[sid] = session;
257 261
258 session.log("debug", "BOSH session created for request from %s", session.ip); 262 session.log("debug", "BOSH session created for request from %s", session.ip);
259 log("info", "New BOSH session, assigned it sid '%s'", sid); 263 log("info", "New BOSH session, assigned it sid '%s'", sid);
260 local r, send_buffer = session.requests, session.send_buffer; 264 local r, send_buffer = session.requests, session.send_buffer;
261 local response = { headers = default_headers }
262 function session.send(s) 265 function session.send(s)
263 -- We need to ensure that outgoing stanzas have the jabber:client xmlns 266 -- We need to ensure that outgoing stanzas have the jabber:client xmlns
264 if s.attr and not s.attr.xmlns then 267 if s.attr and not s.attr.xmlns then
265 s = st.clone(s); 268 s = st.clone(s);
266 s.attr.xmlns = "jabber:client"; 269 s.attr.xmlns = "jabber:client";
267 end 270 end
268 --log("debug", "Sending BOSH data: %s", tostring(s)); 271 --log("debug", "Sending BOSH data: %s", tostring(s));
269 local oldest_request = r[1]; 272 local oldest_request = r[1];
270 if oldest_request and (not(auto_cork) or waiting_requests[oldest_request]) then 273 if oldest_request and (not(auto_cork) or waiting_requests[oldest_request]) then
271 log("debug", "We have an open request, so sending on that"); 274 log("debug", "We have an open request, so sending on that");
272 response.body = t_concat({ 275 oldest_request.headers = default_headers;
276 oldest_request:send(t_concat({
273 "<body xmlns='http://jabber.org/protocol/httpbind' ", 277 "<body xmlns='http://jabber.org/protocol/httpbind' ",
274 session.bosh_terminate and "type='terminate' " or "", 278 session.bosh_terminate and "type='terminate' " or "",
275 "sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>", 279 "sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>",
276 tostring(s), 280 tostring(s),
277 "</body>" 281 "</body>"
278 }); 282 }));
279 oldest_request:send(response);
280 --log("debug", "Sent");
281 if oldest_request.stayopen then
282 if #r>1 then
283 -- Move front request to back
284 t_insert(r, oldest_request);
285 t_remove(r, 1);
286 end
287 else
288 log("debug", "Destroying the request now...");
289 oldest_request:destroy();
290 end
291 elseif s ~= "" then 283 elseif s ~= "" then
292 log("debug", "Saved to send buffer because there are %d open requests", #r); 284 log("debug", "Saved to send buffer because there are %d open requests", #r);
293 -- Hmm, no requests are open :( 285 -- Hmm, no requests are open :(
294 t_insert(session.send_buffer, tostring(s)); 286 t_insert(session.send_buffer, tostring(s));
295 log("debug", "There are now %d things in the send_buffer", #session.send_buffer); 287 log("debug", "There are now %d things in the send_buffer", #session.send_buffer);
301 293
302 local features = st.stanza("stream:features"); 294 local features = st.stanza("stream:features");
303 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); 295 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
304 fire_event("stream-features", session, features); 296 fire_event("stream-features", session, features);
305 --xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh' 297 --xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh'
306 local response = st.stanza("body", { xmlns = xmlns_bosh, 298 local body = st.stanza("body", { xmlns = xmlns_bosh,
307 wait = attr.wait, 299 wait = attr.wait,
308 inactivity = tostring(BOSH_DEFAULT_INACTIVITY), 300 inactivity = tostring(BOSH_DEFAULT_INACTIVITY),
309 polling = tostring(BOSH_DEFAULT_POLLING), 301 polling = tostring(BOSH_DEFAULT_POLLING),
310 requests = tostring(BOSH_DEFAULT_REQUESTS), 302 requests = tostring(BOSH_DEFAULT_REQUESTS),
311 hold = tostring(session.bosh_hold), 303 hold = tostring(session.bosh_hold),
313 ver = '1.6', from = session.host, 305 ver = '1.6', from = session.host,
314 secure = 'true', ["xmpp:version"] = "1.0", 306 secure = 'true', ["xmpp:version"] = "1.0",
315 ["xmlns:xmpp"] = "urn:xmpp:xbosh", 307 ["xmlns:xmpp"] = "urn:xmpp:xbosh",
316 ["xmlns:stream"] = "http://etherx.jabber.org/streams" 308 ["xmlns:stream"] = "http://etherx.jabber.org/streams"
317 }):add_child(features); 309 }):add_child(features);
318 request:send{ headers = default_headers, body = tostring(response) }; 310 response.headers = default_headers;
311 response:send(tostring(body));
319 312
320 request.sid = sid; 313 request.sid = sid;
321 return; 314 return;
322 end 315 end
323 316
324 local session = sessions[sid]; 317 local session = sessions[sid];
325 if not session then 318 if not session then
326 -- Unknown sid 319 -- Unknown sid
327 log("info", "Client tried to use sid '%s' which we don't know about", sid); 320 log("info", "Client tried to use sid '%s' which we don't know about", sid);
328 request:send{ headers = default_headers, body = tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" })) }; 321 response.headers = default_headers;
329 request.notopen = nil; 322 response:send(tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" })));
323 context.notopen = nil;
330 return; 324 return;
331 end 325 end
332 326
333 if session.rid then 327 if session.rid then
334 local rid = tonumber(attr.rid); 328 local rid = tonumber(attr.rid);
336 if diff > 1 then 330 if diff > 1 then
337 session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid); 331 session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid);
338 elseif diff <= 0 then 332 elseif diff <= 0 then
339 -- Repeated, ignore 333 -- Repeated, ignore
340 session.log("debug", "rid repeated (on request %s), ignoring: %s (diff %d)", request.id, session.rid, diff); 334 session.log("debug", "rid repeated (on request %s), ignoring: %s (diff %d)", request.id, session.rid, diff);
341 request.notopen = nil; 335 context.notopen = nil;
342 request.ignore = true; 336 context.ignore = true;
343 request.sid = sid; 337 context.sid = sid;
344 t_insert(session.requests, request); 338 t_insert(session.requests, response);
345 return; 339 return;
346 end 340 end
347 session.rid = rid; 341 session.rid = rid;
348 end 342 end
349 343
351 -- Client wants to end this session, which we'll do 345 -- Client wants to end this session, which we'll do
352 -- after processing any stanzas in this request 346 -- after processing any stanzas in this request
353 session.bosh_terminate = true; 347 session.bosh_terminate = true;
354 end 348 end
355 349
356 request.notopen = nil; -- Signals that we accept this opening tag 350 context.notopen = nil; -- Signals that we accept this opening tag
357 t_insert(session.requests, request); 351 t_insert(session.requests, response);
358 request.sid = sid; 352 context.sid = sid;
359 353
360 if session.notopen then 354 if session.notopen then
361 local features = st.stanza("stream:features"); 355 local features = st.stanza("stream:features");
362 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); 356 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
363 fire_event("stream-features", session, features); 357 fire_event("stream-features", session, features);
364 session.send(features); 358 session.send(features);
365 session.notopen = nil; 359 session.notopen = nil;
366 end 360 end
367 end 361 end
368 362
369 function stream_callbacks.handlestanza(request, stanza) 363 function stream_callbacks.handlestanza(context, stanza)
370 if request.ignore then return; end 364 if context.ignore then return; end
371 log("debug", "BOSH stanza received: %s\n", stanza:top_tag()); 365 log("debug", "BOSH stanza received: %s\n", stanza:top_tag());
372 local session = sessions[request.sid]; 366 local session = sessions[context.sid];
373 if session then 367 if session then
374 if stanza.attr.xmlns == xmlns_bosh then 368 if stanza.attr.xmlns == xmlns_bosh then
375 stanza.attr.xmlns = nil; 369 stanza.attr.xmlns = nil;
376 end 370 end
377 core_process_stanza(session, stanza); 371 core_process_stanza(session, stanza);
378 end 372 end
379 end 373 end
380 374
381 function stream_callbacks.error(request, error) 375 function stream_callbacks.error(context, error)
382 log("debug", "Error parsing BOSH request payload; %s", error); 376 log("debug", "Error parsing BOSH request payload; %s", error);
383 if not request.sid then 377 if not context.sid then
384 request:send({ headers = default_headers, status = "400 Bad Request" }); 378 local response = context.response;
379 response.headers = default_headers;
380 response.status_code = 400;
381 request:send();
385 return; 382 return;
386 end 383 end
387 384
388 local session = sessions[request.sid]; 385 local session = sessions[context.sid];
389 if error == "stream-error" then -- Remote stream error, we close normally 386 if error == "stream-error" then -- Remote stream error, we close normally
390 session:close(); 387 session:close();
391 else 388 else
392 session:close({ condition = "bad-format", text = "Error processing stream" }); 389 session:close({ condition = "bad-format", text = "Error processing stream" });
393 end 390 end
398 -- log("debug", "Checking for requests soon to timeout..."); 395 -- log("debug", "Checking for requests soon to timeout...");
399 -- Identify requests timing out within the next few seconds 396 -- Identify requests timing out within the next few seconds
400 local now = os_time() + 3; 397 local now = os_time() + 3;
401 for request, reply_before in pairs(waiting_requests) do 398 for request, reply_before in pairs(waiting_requests) do
402 if reply_before <= now then 399 if reply_before <= now then
403 log("debug", "%s was soon to timeout, sending empty response", request.id); 400 log("debug", "%s was soon to timeout (at %d, now %d), sending empty response", tostring(request), reply_before, now);
404 -- Send empty response to let the 401 -- Send empty response to let the
405 -- client know we're still here 402 -- client know we're still here
406 if request.conn then 403 if request.conn then
407 sessions[request.sid].send(""); 404 sessions[request.context.sid].send("");
408 end 405 end
409 end 406 end
410 end 407 end
411 408
412 now = now - 3; 409 now = now - 3;
426 dead_sessions[i] = nil; 423 dead_sessions[i] = nil;
427 sm_destroy_session(session, "BOSH client silent for over "..session.bosh_max_inactive.." seconds"); 424 sm_destroy_session(session, "BOSH client silent for over "..session.bosh_max_inactive.." seconds");
428 end 425 end
429 return 1; 426 return 1;
430 end 427 end
431 428 module:add_timer(1, on_timer);
432 429
433 local function setup() 430 function module.add_host(module)
434 local ports = module:get_option_array("bosh_ports") or { 5280 }; 431 module:depends("http");
435 httpserver.new_from_config(ports, handle_request, { base = "http-bind" }); 432 module:provides("http", {
436 timer.add_task(1, on_timer); 433 default_path = "/http-bind";
437 end 434 route = {
438 if prosody.start_time then -- already started 435 ["GET /"] = handle_GET;
439 setup(); 436 ["OPTIONS /"] = handle_OPTIONS;
440 else 437 ["POST /"] = handle_POST;
441 prosody.events.add_handler("server-started", setup); 438 };
442 end 439 });
440 end