Changeset

5789:3b05a86631b9

mod_c2s: Port coroutine code to util.async
author Matthew Wild <mwild1@gmail.com>
date Sun, 11 Aug 2013 14:46:27 +0100
parents 5788:3556f338caa3
children 5790:959163e4d631
files plugins/mod_c2s.lua
diffstat 1 files changed, 18 insertions(+), 63 deletions(-) [+]
line wrap: on
line diff
--- a/plugins/mod_c2s.lua	Sun Aug 11 14:46:07 2013 +0100
+++ b/plugins/mod_c2s.lua	Sun Aug 11 14:46:27 2013 +0100
@@ -15,11 +15,10 @@
 local st = require "util.stanza";
 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session;
 local uuid_generate = require "util.uuid".generate;
+local runner = require "util.async".runner;
 
 local xpcall, tostring, type = xpcall, tostring, type;
-local traceback = debug.traceback;
 local t_insert, t_remove = table.insert, table.remove;
-local co_running, co_resume = coroutine.running, coroutine.resume;
 
 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
 
@@ -35,6 +34,7 @@
 
 local stream_callbacks = { default_ns = "jabber:client" };
 local listener = {};
+local runner_callbacks = {};
 
 --- Stream events handlers
 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
@@ -119,10 +119,9 @@
 	end
 end
 
-local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end
 function stream_callbacks.handlestanza(session, stanza)
 	stanza = session.filter("stanzas/in", stanza);
-	t_insert(session.pending_stanzas, stanza);
+	session.thread:run(stanza);
 end
 
 --- Session methods
@@ -189,6 +188,18 @@
 	end
 end, 200);
 
+function runner_callbacks:ready()
+	self.data.conn:resume();
+end
+
+function runner_callbacks:waiting()
+	self.data.conn:pause();
+end
+
+function runner_callbacks:error(err)
+	(self.data.log or log)("error", "Traceback[c2s]: %s", err);
+end
+
 --- Port listener
 function listener.onconnect(conn)
 	local session = sm_new_session(conn);
@@ -224,14 +235,9 @@
 		session.stream:reset();
 	end
 
-	session.thread = coroutine.create(function (stanza)
-		while true do
-			core_process_stanza(session, stanza);
-			stanza = coroutine.yield("ready");
-		end
-	end);
-
-	session.pending_stanzas = {};
+	session.thread = runner(function (stanza)
+		core_process_stanza(session, stanza);
+	end, runner_callbacks, session);
 
 	local filter = session.filter;
 	function session.data(data)
@@ -246,41 +252,6 @@
 				end
 			end
 		end
-
-		if co_running() ~= session.thread and not session.paused then
-			if session.state == "wait" then
-				session.state = "ready";
-				local ok, state = co_resume(session.thread);
-				if not ok then
-					log("error", "Traceback[c2s]: %s", state);
-				elseif state == "wait" then
-					return;
-				end
-			end
-			-- We're not currently running, so start the thread to process pending stanzas
-			local s, thread = session.pending_stanzas, session.thread;
-			local n = #s;
-			while n > 0 and session.state ~= "wait" do
-				session.log("debug", "processing %d stanzas", n);
-				local consumed;
-				for i = 1,n do
-					local stanza = s[i];
-					local ok, state = co_resume(thread, stanza);
-					if not ok then
-						log("error", "Traceback[c2s]: %s", state);
-					elseif state == "wait" then
-						consumed = i;
-						session.state = "wait";
-						break;
-					end
-				end
-				if not consumed then consumed = n; end
-				for i = 1, #s do
-					s[i] = s[consumed+i];
-				end
-				n = #s;
-			end
-		end
 	end
 
 	if c2s_timeout then
@@ -292,22 +263,6 @@
 	end
 
 	session.dispatch_stanza = stream_callbacks.handlestanza;
-
-	function session:sleep(by)
-		session.log("debug", "Sleeping for %s", by);
-		session.paused = by or "?";
-		session.conn:pause();
-		if co_running() == session.thread then
-			coroutine.yield("wait");
-		end
-	end
-	function session:wake(by)
-		assert(session.paused == (by or "?"));
-		session.log("debug", "Waking for %s", by);
-		session.paused = nil;
-		session.conn:resume();
-		session.data(); --FIXME: next tick?
-	end
 end
 
 function listener.onincoming(conn, data)