Diff

plugins/mod_c2s.lua @ 5773:c9a712673d8a

mod_c2s: Add session:sleep() and session:wake() to pause a session (e.g. while waiting for an external event). Needs a gallon or two of testing.
author Matthew Wild <mwild1@gmail.com>
date Fri, 09 Aug 2013 11:10:22 +0100
parent 5764:969e0a054795
child 5776:bd0ff8ae98a8
line wrap: on
line diff
--- a/plugins/mod_c2s.lua	Tue Aug 06 17:18:39 2013 +0100
+++ b/plugins/mod_c2s.lua	Fri Aug 09 11:10:22 2013 +0100
@@ -18,6 +18,8 @@
 
 local xpcall, tostring, type = xpcall, tostring, type;
 local traceback = debug.traceback;
+local t_insert, t_remove = table.insert, table.remove;
+local co_running, co_resume = coroutine.running, coroutine.resume;
 
 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
 
@@ -31,7 +33,7 @@
 local core_process_stanza = prosody.core_process_stanza;
 local hosts = prosody.hosts;
 
-local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza };
+local stream_callbacks = { default_ns = "jabber:client" };
 local listener = {};
 
 --- Stream events handlers
@@ -120,9 +122,7 @@
 local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end
 function stream_callbacks.handlestanza(session, stanza)
 	stanza = session.filter("stanzas/in", stanza);
-	if stanza then
-		return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
-	end
+	t_insert(session.pending_stanzas, stanza);
 end
 
 --- Session methods
@@ -224,18 +224,65 @@
 		session.stream:reset();
 	end
 	
+	session.thread = coroutine.create(function (stanza)
+		while true do
+			core_process_stanza(session, stanza);
+			stanza = coroutine.yield("ready");
+		end
+	end);
+
+	session.pending_stanzas = {};
+
 	local filter = session.filter;
 	function session.data(data)
-		data = filter("bytes/in", data);
+		-- Parse the data, which will store stanzas in session.pending_stanzas
 		if data then
-			local ok, err = stream:feed(data);
-			if ok then return; end
-			log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
-			session:close("not-well-formed");
+			data = filter("bytes/in", data);
+			if data then
+				local ok, err = stream:feed(data);
+				if not ok then
+					log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
+					session:close("not-well-formed");
+				end
+			end
+		end
+
+		if co_running() ~= session.thread and not session.paused then
+			if session.state == "wait" then
+				session.state = "ready";
+				local ok, state = co_resume(session.thread);
+				if not ok then
+					log("error", "Traceback[c2s]: %s", state);
+				elseif state == "wait" then
+					return;
+				end
+			end
+			-- We're not currently running, so start the thread to process pending stanzas
+			local s, thread = session.pending_stanzas, session.thread;
+			local n = #s;
+			while n > 0 and session.state ~= "wait" do
+				session.log("debug", "processing %d stanzas", n);
+				local consumed;
+				for i = 1,n do
+					local stanza = s[i];
+					local ok, state = co_resume(thread, stanza);
+					if not ok then
+						log("error", "Traceback[c2s]: %s", state);
+					elseif state == "wait" then
+						consumed = i;
+						session.state = "wait";
+						break;
+					end
+				end
+				if not consumed then consumed = n; end
+				for i = 1, #s do
+					s[i] = s[consumed+i];
+				end
+				n = #s;
+			end
 		end
 	end
 
-	
 	if c2s_timeout then
 		add_task(c2s_timeout, function ()
 			if session.type == "c2s_unauthed" then
@@ -245,6 +292,22 @@
 	end
 
 	session.dispatch_stanza = stream_callbacks.handlestanza;
+
+	function session:sleep(by)
+		session.log("debug", "Sleeping for %s", by);
+		session.paused = by or "?";
+		session.conn:pause();
+		if co_running() == session.thread then
+			coroutine.yield("wait");
+		end
+	end
+	function session:wake(by)
+		assert(session.paused == (by or "?"));
+		session.log("debug", "Waking for %s", by);
+		session.paused = nil;
+		session.conn:resume();
+		session.data(); --FIXME: next tick?
+	end
 end
 
 function listener.onincoming(conn, data)