Software /
code /
prosody
Changeset
7451:464a8a8de625
mod_s2s: Add util.async support
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Mon, 30 May 2016 13:36:43 +0200 (2016-05-30) |
parents | 7450:3ae19750cd46 |
children | 7452:d916703d5e18 |
files | plugins/mod_s2s/mod_s2s.lua |
diffstat | 1 files changed, 37 insertions(+), 9 deletions(-) [+] |
line wrap: on
line diff
--- a/plugins/mod_s2s/mod_s2s.lua Mon May 30 13:30:53 2016 +0200 +++ b/plugins/mod_s2s/mod_s2s.lua Mon May 30 13:36:43 2016 +0200 @@ -26,6 +26,7 @@ local s2s_destroy_session = require "core.s2smanager".destroy_session; local uuid_gen = require "util.uuid".generate; local fire_global_event = prosody.events.fire_event; +local runner = require "util.async".runner; local s2sout = module:require("s2sout"); @@ -41,6 +42,8 @@ local sessions = module:shared("sessions"); +local runner_callbacks = {}; + local log = module._log; --- Handle stanzas to remote domains @@ -257,11 +260,21 @@ --- XMPP stream event handlers -local stream_callbacks = { default_ns = "jabber:server", handlestanza = core_process_stanza }; +local stream_callbacks = { default_ns = "jabber:server" }; + +function stream_callbacks.handlestanza(session, stanza) + stanza = session.filter("stanzas/in", stanza); + session.thread:run(stanza); +end local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; function stream_callbacks.streamopened(session, attr) + -- run _streamopened in async context + session.thread:run({ attr = attr }); +end + +function stream_callbacks._streamopened(session, attr) session.version = tonumber(attr.version) or 0; -- TODO: Rename session.secure to session.encrypted @@ -435,14 +448,6 @@ end end -local function handleerr(err) log("error", "Traceback[s2s]: %s", traceback(tostring(err), 2)); end -function stream_callbacks.handlestanza(session, stanza) - stanza = session.filter("stanzas/in", stanza); - if stanza then - return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); - end -end - local listener = {}; --- Session methods @@ -517,6 +522,15 @@ -- Session initialization logic shared by incoming and outgoing local function initialize_session(session) local stream = new_xmpp_stream(session, stream_callbacks); + + session.thread = runner(function (stanza) + if stanza.name == nil then + stream_callbacks._streamopened(session, stanza.attr); + else + core_process_stanza(session, stanza); + end + end, runner_callbacks, session); + local log = session.log or log; session.stream = stream; @@ -580,6 +594,20 @@ end); end +function runner_callbacks:ready() + self.data.log("debug", "Runner %s ready (%s)", self.thread, coroutine.status(self.thread)); + self.data.conn:resume(); +end + +function runner_callbacks:waiting() + self.data.log("debug", "Runner %s waiting (%s)", self.thread, coroutine.status(self.thread)); + self.data.conn:pause(); +end + +function runner_callbacks:error(err) + (self.data.log or log)("error", "Traceback[s2s]: %s", err); +end + function listener.onconnect(conn) measure_connections(1); conn:setoption("keepalive", opt_keepalives);