Comparison

plugins/mod_c2s.lua @ 5773:c9a712673d8a

mod_c2s: Add session:sleep() and session:wake() to pause a session (e.g. while waiting for an external event). Needs a gallon or two of testing.
author Matthew Wild <mwild1@gmail.com>
date Fri, 09 Aug 2013 11:10:22 +0100
parent 5764:969e0a054795
child 5776:bd0ff8ae98a8
comparison
equal deleted inserted replaced
5772:9cef4b5c2fe3 5773:c9a712673d8a
16 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; 16 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session;
17 local uuid_generate = require "util.uuid".generate; 17 local uuid_generate = require "util.uuid".generate;
18 18
19 local xpcall, tostring, type = xpcall, tostring, type; 19 local xpcall, tostring, type = xpcall, tostring, type;
20 local traceback = debug.traceback; 20 local traceback = debug.traceback;
21 local t_insert, t_remove = table.insert, table.remove;
22 local co_running, co_resume = coroutine.running, coroutine.resume;
21 23
22 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; 24 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
23 25
24 local log = module._log; 26 local log = module._log;
25 27
29 31
30 local sessions = module:shared("sessions"); 32 local sessions = module:shared("sessions");
31 local core_process_stanza = prosody.core_process_stanza; 33 local core_process_stanza = prosody.core_process_stanza;
32 local hosts = prosody.hosts; 34 local hosts = prosody.hosts;
33 35
34 local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza }; 36 local stream_callbacks = { default_ns = "jabber:client" };
35 local listener = {}; 37 local listener = {};
36 38
37 --- Stream events handlers 39 --- Stream events handlers
38 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; 40 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
39 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; 41 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" };
118 end 120 end
119 121
120 local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end 122 local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end
121 function stream_callbacks.handlestanza(session, stanza) 123 function stream_callbacks.handlestanza(session, stanza)
122 stanza = session.filter("stanzas/in", stanza); 124 stanza = session.filter("stanzas/in", stanza);
123 if stanza then 125 t_insert(session.pending_stanzas, stanza);
124 return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
125 end
126 end 126 end
127 127
128 --- Session methods 128 --- Session methods
129 local function session_close(session, reason) 129 local function session_close(session, reason)
130 local log = session.log or log; 130 local log = session.log or log;
222 function session.reset_stream() 222 function session.reset_stream()
223 session.notopen = true; 223 session.notopen = true;
224 session.stream:reset(); 224 session.stream:reset();
225 end 225 end
226 226
227 session.thread = coroutine.create(function (stanza)
228 while true do
229 core_process_stanza(session, stanza);
230 stanza = coroutine.yield("ready");
231 end
232 end);
233
234 session.pending_stanzas = {};
235
227 local filter = session.filter; 236 local filter = session.filter;
228 function session.data(data) 237 function session.data(data)
229 data = filter("bytes/in", data); 238 -- Parse the data, which will store stanzas in session.pending_stanzas
230 if data then 239 if data then
231 local ok, err = stream:feed(data); 240 data = filter("bytes/in", data);
232 if ok then return; end 241 if data then
233 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); 242 local ok, err = stream:feed(data);
234 session:close("not-well-formed"); 243 if not ok then
235 end 244 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
236 end 245 session:close("not-well-formed");
237 246 end
238 247 end
248 end
249
250 if co_running() ~= session.thread and not session.paused then
251 if session.state == "wait" then
252 session.state = "ready";
253 local ok, state = co_resume(session.thread);
254 if not ok then
255 log("error", "Traceback[c2s]: %s", state);
256 elseif state == "wait" then
257 return;
258 end
259 end
260 -- We're not currently running, so start the thread to process pending stanzas
261 local s, thread = session.pending_stanzas, session.thread;
262 local n = #s;
263 while n > 0 and session.state ~= "wait" do
264 session.log("debug", "processing %d stanzas", n);
265 local consumed;
266 for i = 1,n do
267 local stanza = s[i];
268 local ok, state = co_resume(thread, stanza);
269 if not ok then
270 log("error", "Traceback[c2s]: %s", state);
271 elseif state == "wait" then
272 consumed = i;
273 session.state = "wait";
274 break;
275 end
276 end
277 if not consumed then consumed = n; end
278 for i = 1, #s do
279 s[i] = s[consumed+i];
280 end
281 n = #s;
282 end
283 end
284 end
285
239 if c2s_timeout then 286 if c2s_timeout then
240 add_task(c2s_timeout, function () 287 add_task(c2s_timeout, function ()
241 if session.type == "c2s_unauthed" then 288 if session.type == "c2s_unauthed" then
242 session:close("connection-timeout"); 289 session:close("connection-timeout");
243 end 290 end
244 end); 291 end);
245 end 292 end
246 293
247 session.dispatch_stanza = stream_callbacks.handlestanza; 294 session.dispatch_stanza = stream_callbacks.handlestanza;
295
296 function session:sleep(by)
297 session.log("debug", "Sleeping for %s", by);
298 session.paused = by or "?";
299 session.conn:pause();
300 if co_running() == session.thread then
301 coroutine.yield("wait");
302 end
303 end
304 function session:wake(by)
305 assert(session.paused == (by or "?"));
306 session.log("debug", "Waking for %s", by);
307 session.paused = nil;
308 session.conn:resume();
309 session.data(); --FIXME: next tick?
310 end
248 end 311 end
249 312
250 function listener.onincoming(conn, data) 313 function listener.onincoming(conn, data)
251 local session = sessions[conn]; 314 local session = sessions[conn];
252 if session then 315 if session then