Software /
code /
prosody
Comparison
plugins/mod_bosh.lua @ 6528:f0687c313cf1
mod_bosh: Use util.async
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Sun, 07 Dec 2014 17:56:25 +0100 |
parent | 5776:bd0ff8ae98a8 |
child | 6989:118858bf47cd |
child | 7047:9ca2b720ad43 |
comparison
equal
deleted
inserted
replaced
6526:873538f0b18c | 6528:f0687c313cf1 |
---|---|
20 local log = logger.init("mod_bosh"); | 20 local log = logger.init("mod_bosh"); |
21 local initialize_filters = require "util.filters".initialize; | 21 local initialize_filters = require "util.filters".initialize; |
22 local math_min = math.min; | 22 local math_min = math.min; |
23 local xpcall, tostring, type = xpcall, tostring, type; | 23 local xpcall, tostring, type = xpcall, tostring, type; |
24 local traceback = debug.traceback; | 24 local traceback = debug.traceback; |
25 local runner = require"util.async".runner; | |
25 | 26 |
26 local xmlns_streams = "http://etherx.jabber.org/streams"; | 27 local xmlns_streams = "http://etherx.jabber.org/streams"; |
27 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; | 28 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; |
28 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; -- (hard-coded into a literal in session.send) | 29 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; -- (hard-coded into a literal in session.send) |
29 | 30 |
226 sessions[session.sid] = nil; | 227 sessions[session.sid] = nil; |
227 inactive_sessions[session] = nil; | 228 inactive_sessions[session] = nil; |
228 sm_destroy_session(session); | 229 sm_destroy_session(session); |
229 end | 230 end |
230 | 231 |
232 local runner_callbacks = { }; | |
233 | |
231 -- Handle the <body> tag in the request payload. | 234 -- Handle the <body> tag in the request payload. |
232 function stream_callbacks.streamopened(context, attr) | 235 function stream_callbacks.streamopened(context, attr) |
233 local request, response = context.request, context.response; | 236 local request, response = context.request, context.response; |
234 local sid = attr.sid; | 237 local sid = attr.sid; |
235 log("debug", "BOSH body open (sid: %s)", sid or "<none>"); | 238 log("debug", "BOSH body open (sid: %s)", sid or "<none>"); |
257 close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true, | 260 close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true, |
258 log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure, | 261 log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure, |
259 ip = get_ip_from_request(request); | 262 ip = get_ip_from_request(request); |
260 }; | 263 }; |
261 sessions[sid] = session; | 264 sessions[sid] = session; |
265 | |
266 session.thread = runner(function (stanza) | |
267 session:dispatch_stanza(stanza); | |
268 end, runner_callbacks, session); | |
262 | 269 |
263 local filter = initialize_filters(session); | 270 local filter = initialize_filters(session); |
264 | 271 |
265 session.log("debug", "BOSH session created for request from %s", session.ip); | 272 session.log("debug", "BOSH session created for request from %s", session.ip); |
266 log("info", "New BOSH session, assigned it sid '%s'", sid); | 273 log("info", "New BOSH session, assigned it sid '%s'", sid); |
353 session.notopen = nil; | 360 session.notopen = nil; |
354 end | 361 end |
355 end | 362 end |
356 | 363 |
357 local function handleerr(err) log("error", "Traceback[bosh]: %s", traceback(tostring(err), 2)); end | 364 local function handleerr(err) log("error", "Traceback[bosh]: %s", traceback(tostring(err), 2)); end |
365 | |
366 function runner_callbacks:error(err) | |
367 return handleerr(err); | |
368 end | |
369 | |
358 function stream_callbacks.handlestanza(context, stanza) | 370 function stream_callbacks.handlestanza(context, stanza) |
359 if context.ignore then return; end | 371 if context.ignore then return; end |
360 log("debug", "BOSH stanza received: %s\n", stanza:top_tag()); | 372 log("debug", "BOSH stanza received: %s\n", stanza:top_tag()); |
361 local session = sessions[context.sid]; | 373 local session = sessions[context.sid]; |
362 if session then | 374 if session then |
363 if stanza.attr.xmlns == xmlns_bosh then | 375 if stanza.attr.xmlns == xmlns_bosh then |
364 stanza.attr.xmlns = nil; | 376 stanza.attr.xmlns = nil; |
365 end | 377 end |
366 stanza = session.filter("stanzas/in", stanza); | 378 stanza = session.filter("stanzas/in", stanza); |
367 if stanza then | 379 session.thread:run(stanza); |
368 return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); | |
369 end | |
370 end | 380 end |
371 end | 381 end |
372 | 382 |
373 function stream_callbacks.streamclosed(context) | 383 function stream_callbacks.streamclosed(context) |
374 local session = sessions[context.sid]; | 384 local session = sessions[context.sid]; |