Comparison

plugins/mod_bosh.lua @ 7283:32754b15b28a

Backed out BOSH use of util.async (changeset f0687c313cf1)
author Kim Alvefur <zash@zash.se>
date Thu, 17 Mar 2016 12:46:52 +0100
parent 7047:9ca2b720ad43
child 7286:30b9433a9f3e
child 7326:d11701e86702
comparison
equal deleted inserted replaced
7281:21da137d83b0 7283:32754b15b28a
20 local log = logger.init("mod_bosh"); 20 local log = logger.init("mod_bosh");
21 local initialize_filters = require "util.filters".initialize; 21 local initialize_filters = require "util.filters".initialize;
22 local math_min = math.min; 22 local math_min = math.min;
23 local xpcall, tostring, type = xpcall, tostring, type; 23 local xpcall, tostring, type = xpcall, tostring, type;
24 local traceback = debug.traceback; 24 local traceback = debug.traceback;
25 local runner = require"util.async".runner;
26 25
27 local xmlns_streams = "http://etherx.jabber.org/streams"; 26 local xmlns_streams = "http://etherx.jabber.org/streams";
28 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; 27 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
29 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; -- (hard-coded into a literal in session.send) 28 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; -- (hard-coded into a literal in session.send)
30 29
227 sessions[session.sid] = nil; 226 sessions[session.sid] = nil;
228 inactive_sessions[session] = nil; 227 inactive_sessions[session] = nil;
229 sm_destroy_session(session); 228 sm_destroy_session(session);
230 end 229 end
231 230
232 local runner_callbacks = { };
233
234 -- Handle the <body> tag in the request payload. 231 -- Handle the <body> tag in the request payload.
235 function stream_callbacks.streamopened(context, attr) 232 function stream_callbacks.streamopened(context, attr)
236 local request, response = context.request, context.response; 233 local request, response = context.request, context.response;
237 local sid = attr.sid; 234 local sid = attr.sid;
238 log("debug", "BOSH body open (sid: %s)", sid or "<none>"); 235 log("debug", "BOSH body open (sid: %s)", sid or "<none>");
260 close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true, 257 close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true,
261 log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure, 258 log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure,
262 ip = get_ip_from_request(request); 259 ip = get_ip_from_request(request);
263 }; 260 };
264 sessions[sid] = session; 261 sessions[sid] = session;
265
266 session.thread = runner(function (stanza)
267 session:dispatch_stanza(stanza);
268 end, runner_callbacks, session);
269 262
270 local filter = initialize_filters(session); 263 local filter = initialize_filters(session);
271 264
272 session.log("debug", "BOSH session created for request from %s", session.ip); 265 session.log("debug", "BOSH session created for request from %s", session.ip);
273 log("info", "New BOSH session, assigned it sid '%s'", sid); 266 log("info", "New BOSH session, assigned it sid '%s'", sid);
362 session.notopen = nil; 355 session.notopen = nil;
363 end 356 end
364 end 357 end
365 358
366 local function handleerr(err) log("error", "Traceback[bosh]: %s", traceback(tostring(err), 2)); end 359 local function handleerr(err) log("error", "Traceback[bosh]: %s", traceback(tostring(err), 2)); end
367
368 function runner_callbacks:error(err)
369 return handleerr(err);
370 end
371
372 function stream_callbacks.handlestanza(context, stanza) 360 function stream_callbacks.handlestanza(context, stanza)
373 if context.ignore then return; end 361 if context.ignore then return; end
374 log("debug", "BOSH stanza received: %s\n", stanza:top_tag()); 362 log("debug", "BOSH stanza received: %s\n", stanza:top_tag());
375 local session = sessions[context.sid]; 363 local session = sessions[context.sid];
376 if session then 364 if session then
377 if stanza.attr.xmlns == xmlns_bosh then 365 if stanza.attr.xmlns == xmlns_bosh then
378 stanza.attr.xmlns = nil; 366 stanza.attr.xmlns = nil;
379 end 367 end
380 stanza = session.filter("stanzas/in", stanza); 368 stanza = session.filter("stanzas/in", stanza);
381 session.thread:run(stanza); 369 if stanza then
370 return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
371 end
382 end 372 end
383 end 373 end
384 374
385 function stream_callbacks.streamclosed(context) 375 function stream_callbacks.streamclosed(context)
386 local session = sessions[context.sid]; 376 local session = sessions[context.sid];