# HG changeset patch # User Kim Alvefur # Date 1464608203 -7200 # Node ID 464a8a8de62528730ad4b45cd3df434e67bc4a6b # Parent 3ae19750cd4610916f622ffba6782fb1e99bf5d4 mod_s2s: Add util.async support diff -r 3ae19750cd46 -r 464a8a8de625 plugins/mod_s2s/mod_s2s.lua --- a/plugins/mod_s2s/mod_s2s.lua Mon May 30 13:30:53 2016 +0200 +++ b/plugins/mod_s2s/mod_s2s.lua Mon May 30 13:36:43 2016 +0200 @@ -26,6 +26,7 @@ local s2s_destroy_session = require "core.s2smanager".destroy_session; local uuid_gen = require "util.uuid".generate; local fire_global_event = prosody.events.fire_event; +local runner = require "util.async".runner; local s2sout = module:require("s2sout"); @@ -41,6 +42,8 @@ local sessions = module:shared("sessions"); +local runner_callbacks = {}; + local log = module._log; --- Handle stanzas to remote domains @@ -257,11 +260,21 @@ --- XMPP stream event handlers -local stream_callbacks = { default_ns = "jabber:server", handlestanza = core_process_stanza }; +local stream_callbacks = { default_ns = "jabber:server" }; + +function stream_callbacks.handlestanza(session, stanza) + stanza = session.filter("stanzas/in", stanza); + session.thread:run(stanza); +end local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; function stream_callbacks.streamopened(session, attr) + -- run _streamopened in async context + session.thread:run({ attr = attr }); +end + +function stream_callbacks._streamopened(session, attr) session.version = tonumber(attr.version) or 0; -- TODO: Rename session.secure to session.encrypted @@ -435,14 +448,6 @@ end end -local function handleerr(err) log("error", "Traceback[s2s]: %s", traceback(tostring(err), 2)); end -function stream_callbacks.handlestanza(session, stanza) - stanza = session.filter("stanzas/in", stanza); - if stanza then - return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); - end -end - local listener = {}; --- Session methods @@ -517,6 +522,15 @@ -- Session initialization logic shared by incoming and outgoing local function initialize_session(session) local stream = new_xmpp_stream(session, stream_callbacks); + + session.thread = runner(function (stanza) + if stanza.name == nil then + stream_callbacks._streamopened(session, stanza.attr); + else + core_process_stanza(session, stanza); + end + end, runner_callbacks, session); + local log = session.log or log; session.stream = stream; @@ -580,6 +594,20 @@ end); end +function runner_callbacks:ready() + self.data.log("debug", "Runner %s ready (%s)", self.thread, coroutine.status(self.thread)); + self.data.conn:resume(); +end + +function runner_callbacks:waiting() + self.data.log("debug", "Runner %s waiting (%s)", self.thread, coroutine.status(self.thread)); + self.data.conn:pause(); +end + +function runner_callbacks:error(err) + (self.data.log or log)("error", "Traceback[s2s]: %s", err); +end + function listener.onconnect(conn) measure_connections(1); conn:setoption("keepalive", opt_keepalives);