Comparison

plugins/mod_bosh.lua @ 6528:f0687c313cf1

mod_bosh: Use util.async
author Kim Alvefur <zash@zash.se>
date Sun, 07 Dec 2014 17:56:25 +0100
parent 5776:bd0ff8ae98a8
child 6989:118858bf47cd
child 7047:9ca2b720ad43
comparison
equal deleted inserted replaced
6526:873538f0b18c 6528:f0687c313cf1
20 local log = logger.init("mod_bosh"); 20 local log = logger.init("mod_bosh");
21 local initialize_filters = require "util.filters".initialize; 21 local initialize_filters = require "util.filters".initialize;
22 local math_min = math.min; 22 local math_min = math.min;
23 local xpcall, tostring, type = xpcall, tostring, type; 23 local xpcall, tostring, type = xpcall, tostring, type;
24 local traceback = debug.traceback; 24 local traceback = debug.traceback;
25 local runner = require"util.async".runner;
25 26
26 local xmlns_streams = "http://etherx.jabber.org/streams"; 27 local xmlns_streams = "http://etherx.jabber.org/streams";
27 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; 28 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
28 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; -- (hard-coded into a literal in session.send) 29 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; -- (hard-coded into a literal in session.send)
29 30
226 sessions[session.sid] = nil; 227 sessions[session.sid] = nil;
227 inactive_sessions[session] = nil; 228 inactive_sessions[session] = nil;
228 sm_destroy_session(session); 229 sm_destroy_session(session);
229 end 230 end
230 231
232 local runner_callbacks = { };
233
231 -- Handle the <body> tag in the request payload. 234 -- Handle the <body> tag in the request payload.
232 function stream_callbacks.streamopened(context, attr) 235 function stream_callbacks.streamopened(context, attr)
233 local request, response = context.request, context.response; 236 local request, response = context.request, context.response;
234 local sid = attr.sid; 237 local sid = attr.sid;
235 log("debug", "BOSH body open (sid: %s)", sid or "<none>"); 238 log("debug", "BOSH body open (sid: %s)", sid or "<none>");
257 close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true, 260 close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true,
258 log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure, 261 log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure,
259 ip = get_ip_from_request(request); 262 ip = get_ip_from_request(request);
260 }; 263 };
261 sessions[sid] = session; 264 sessions[sid] = session;
265
266 session.thread = runner(function (stanza)
267 session:dispatch_stanza(stanza);
268 end, runner_callbacks, session);
262 269
263 local filter = initialize_filters(session); 270 local filter = initialize_filters(session);
264 271
265 session.log("debug", "BOSH session created for request from %s", session.ip); 272 session.log("debug", "BOSH session created for request from %s", session.ip);
266 log("info", "New BOSH session, assigned it sid '%s'", sid); 273 log("info", "New BOSH session, assigned it sid '%s'", sid);
353 session.notopen = nil; 360 session.notopen = nil;
354 end 361 end
355 end 362 end
356 363
357 local function handleerr(err) log("error", "Traceback[bosh]: %s", traceback(tostring(err), 2)); end 364 local function handleerr(err) log("error", "Traceback[bosh]: %s", traceback(tostring(err), 2)); end
365
366 function runner_callbacks:error(err)
367 return handleerr(err);
368 end
369
358 function stream_callbacks.handlestanza(context, stanza) 370 function stream_callbacks.handlestanza(context, stanza)
359 if context.ignore then return; end 371 if context.ignore then return; end
360 log("debug", "BOSH stanza received: %s\n", stanza:top_tag()); 372 log("debug", "BOSH stanza received: %s\n", stanza:top_tag());
361 local session = sessions[context.sid]; 373 local session = sessions[context.sid];
362 if session then 374 if session then
363 if stanza.attr.xmlns == xmlns_bosh then 375 if stanza.attr.xmlns == xmlns_bosh then
364 stanza.attr.xmlns = nil; 376 stanza.attr.xmlns = nil;
365 end 377 end
366 stanza = session.filter("stanzas/in", stanza); 378 stanza = session.filter("stanzas/in", stanza);
367 if stanza then 379 session.thread:run(stanza);
368 return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
369 end
370 end 380 end
371 end 381 end
372 382
373 function stream_callbacks.streamclosed(context) 383 function stream_callbacks.streamclosed(context)
374 local session = sessions[context.sid]; 384 local session = sessions[context.sid];