Software /
code /
prosody
Changeset
5789:3b05a86631b9
mod_c2s: Port coroutine code to util.async
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Sun, 11 Aug 2013 14:46:27 +0100 |
parents | 5788:3556f338caa3 |
children | 5790:959163e4d631 |
files | plugins/mod_c2s.lua |
diffstat | 1 files changed, 18 insertions(+), 63 deletions(-) [+] |
line wrap: on
line diff
--- a/plugins/mod_c2s.lua Sun Aug 11 14:46:07 2013 +0100 +++ b/plugins/mod_c2s.lua Sun Aug 11 14:46:27 2013 +0100 @@ -15,11 +15,10 @@ local st = require "util.stanza"; local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; local uuid_generate = require "util.uuid".generate; +local runner = require "util.async".runner; local xpcall, tostring, type = xpcall, tostring, type; -local traceback = debug.traceback; local t_insert, t_remove = table.insert, table.remove; -local co_running, co_resume = coroutine.running, coroutine.resume; local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; @@ -35,6 +34,7 @@ local stream_callbacks = { default_ns = "jabber:client" }; local listener = {}; +local runner_callbacks = {}; --- Stream events handlers local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; @@ -119,10 +119,9 @@ end end -local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end function stream_callbacks.handlestanza(session, stanza) stanza = session.filter("stanzas/in", stanza); - t_insert(session.pending_stanzas, stanza); + session.thread:run(stanza); end --- Session methods @@ -189,6 +188,18 @@ end end, 200); +function runner_callbacks:ready() + self.data.conn:resume(); +end + +function runner_callbacks:waiting() + self.data.conn:pause(); +end + +function runner_callbacks:error(err) + (self.data.log or log)("error", "Traceback[c2s]: %s", err); +end + --- Port listener function listener.onconnect(conn) local session = sm_new_session(conn); @@ -224,14 +235,9 @@ session.stream:reset(); end - session.thread = coroutine.create(function (stanza) - while true do - core_process_stanza(session, stanza); - stanza = coroutine.yield("ready"); - end - end); - - session.pending_stanzas = {}; + session.thread = runner(function (stanza) + core_process_stanza(session, stanza); + end, runner_callbacks, session); local filter = session.filter; function session.data(data) @@ -246,41 +252,6 @@ end end end - - if co_running() ~= session.thread and not session.paused then - if session.state == "wait" then - session.state = "ready"; - local ok, state = co_resume(session.thread); - if not ok then - log("error", "Traceback[c2s]: %s", state); - elseif state == "wait" then - return; - end - end - -- We're not currently running, so start the thread to process pending stanzas - local s, thread = session.pending_stanzas, session.thread; - local n = #s; - while n > 0 and session.state ~= "wait" do - session.log("debug", "processing %d stanzas", n); - local consumed; - for i = 1,n do - local stanza = s[i]; - local ok, state = co_resume(thread, stanza); - if not ok then - log("error", "Traceback[c2s]: %s", state); - elseif state == "wait" then - consumed = i; - session.state = "wait"; - break; - end - end - if not consumed then consumed = n; end - for i = 1, #s do - s[i] = s[consumed+i]; - end - n = #s; - end - end end if c2s_timeout then @@ -292,22 +263,6 @@ end session.dispatch_stanza = stream_callbacks.handlestanza; - - function session:sleep(by) - session.log("debug", "Sleeping for %s", by); - session.paused = by or "?"; - session.conn:pause(); - if co_running() == session.thread then - coroutine.yield("wait"); - end - end - function session:wake(by) - assert(session.paused == (by or "?")); - session.log("debug", "Waking for %s", by); - session.paused = nil; - session.conn:resume(); - session.data(); --FIXME: next tick? - end end function listener.onincoming(conn, data)