Comparison

plugins/mod_c2s.lua @ 5789:3b05a86631b9

mod_c2s: Port coroutine code to util.async
author Matthew Wild <mwild1@gmail.com>
date Sun, 11 Aug 2013 14:46:27 +0100
parent 5776:bd0ff8ae98a8
child 5801:224644752bf4
comparison
equal deleted inserted replaced
5788:3556f338caa3 5789:3b05a86631b9
13 local nameprep = require "util.encodings".stringprep.nameprep; 13 local nameprep = require "util.encodings".stringprep.nameprep;
14 local sessionmanager = require "core.sessionmanager"; 14 local sessionmanager = require "core.sessionmanager";
15 local st = require "util.stanza"; 15 local st = require "util.stanza";
16 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; 16 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session;
17 local uuid_generate = require "util.uuid".generate; 17 local uuid_generate = require "util.uuid".generate;
18 local runner = require "util.async".runner;
18 19
19 local xpcall, tostring, type = xpcall, tostring, type; 20 local xpcall, tostring, type = xpcall, tostring, type;
20 local traceback = debug.traceback;
21 local t_insert, t_remove = table.insert, table.remove; 21 local t_insert, t_remove = table.insert, table.remove;
22 local co_running, co_resume = coroutine.running, coroutine.resume;
23 22
24 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; 23 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
25 24
26 local log = module._log; 25 local log = module._log;
27 26
33 local core_process_stanza = prosody.core_process_stanza; 32 local core_process_stanza = prosody.core_process_stanza;
34 local hosts = prosody.hosts; 33 local hosts = prosody.hosts;
35 34
36 local stream_callbacks = { default_ns = "jabber:client" }; 35 local stream_callbacks = { default_ns = "jabber:client" };
37 local listener = {}; 36 local listener = {};
37 local runner_callbacks = {};
38 38
39 --- Stream events handlers 39 --- Stream events handlers
40 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; 40 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
41 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; 41 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" };
42 42
117 session.log("info", "Session closed by remote with error: %s", text); 117 session.log("info", "Session closed by remote with error: %s", text);
118 session:close(nil, text); 118 session:close(nil, text);
119 end 119 end
120 end 120 end
121 121
122 local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end
123 function stream_callbacks.handlestanza(session, stanza) 122 function stream_callbacks.handlestanza(session, stanza)
124 stanza = session.filter("stanzas/in", stanza); 123 stanza = session.filter("stanzas/in", stanza);
125 t_insert(session.pending_stanzas, stanza); 124 session.thread:run(stanza);
126 end 125 end
127 126
128 --- Session methods 127 --- Session methods
129 local function session_close(session, reason) 128 local function session_close(session, reason)
130 local log = session.log or log; 129 local log = session.log or log;
187 session:close{ condition = "not-authorized", text = "Account deleted" }; 186 session:close{ condition = "not-authorized", text = "Account deleted" };
188 end 187 end
189 end 188 end
190 end, 200); 189 end, 200);
191 190
191 function runner_callbacks:ready()
192 self.data.conn:resume();
193 end
194
195 function runner_callbacks:waiting()
196 self.data.conn:pause();
197 end
198
199 function runner_callbacks:error(err)
200 (self.data.log or log)("error", "Traceback[c2s]: %s", err);
201 end
202
192 --- Port listener 203 --- Port listener
193 function listener.onconnect(conn) 204 function listener.onconnect(conn)
194 local session = sm_new_session(conn); 205 local session = sm_new_session(conn);
195 sessions[conn] = session; 206 sessions[conn] = session;
196 207
222 function session.reset_stream() 233 function session.reset_stream()
223 session.notopen = true; 234 session.notopen = true;
224 session.stream:reset(); 235 session.stream:reset();
225 end 236 end
226 237
227 session.thread = coroutine.create(function (stanza) 238 session.thread = runner(function (stanza)
228 while true do 239 core_process_stanza(session, stanza);
229 core_process_stanza(session, stanza); 240 end, runner_callbacks, session);
230 stanza = coroutine.yield("ready");
231 end
232 end);
233
234 session.pending_stanzas = {};
235 241
236 local filter = session.filter; 242 local filter = session.filter;
237 function session.data(data) 243 function session.data(data)
238 -- Parse the data, which will store stanzas in session.pending_stanzas 244 -- Parse the data, which will store stanzas in session.pending_stanzas
239 if data then 245 if data then
244 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); 250 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
245 session:close("not-well-formed"); 251 session:close("not-well-formed");
246 end 252 end
247 end 253 end
248 end 254 end
249
250 if co_running() ~= session.thread and not session.paused then
251 if session.state == "wait" then
252 session.state = "ready";
253 local ok, state = co_resume(session.thread);
254 if not ok then
255 log("error", "Traceback[c2s]: %s", state);
256 elseif state == "wait" then
257 return;
258 end
259 end
260 -- We're not currently running, so start the thread to process pending stanzas
261 local s, thread = session.pending_stanzas, session.thread;
262 local n = #s;
263 while n > 0 and session.state ~= "wait" do
264 session.log("debug", "processing %d stanzas", n);
265 local consumed;
266 for i = 1,n do
267 local stanza = s[i];
268 local ok, state = co_resume(thread, stanza);
269 if not ok then
270 log("error", "Traceback[c2s]: %s", state);
271 elseif state == "wait" then
272 consumed = i;
273 session.state = "wait";
274 break;
275 end
276 end
277 if not consumed then consumed = n; end
278 for i = 1, #s do
279 s[i] = s[consumed+i];
280 end
281 n = #s;
282 end
283 end
284 end 255 end
285 256
286 if c2s_timeout then 257 if c2s_timeout then
287 add_task(c2s_timeout, function () 258 add_task(c2s_timeout, function ()
288 if session.type == "c2s_unauthed" then 259 if session.type == "c2s_unauthed" then
290 end 261 end
291 end); 262 end);
292 end 263 end
293 264
294 session.dispatch_stanza = stream_callbacks.handlestanza; 265 session.dispatch_stanza = stream_callbacks.handlestanza;
295
296 function session:sleep(by)
297 session.log("debug", "Sleeping for %s", by);
298 session.paused = by or "?";
299 session.conn:pause();
300 if co_running() == session.thread then
301 coroutine.yield("wait");
302 end
303 end
304 function session:wake(by)
305 assert(session.paused == (by or "?"));
306 session.log("debug", "Waking for %s", by);
307 session.paused = nil;
308 session.conn:resume();
309 session.data(); --FIXME: next tick?
310 end
311 end 266 end
312 267
313 function listener.onincoming(conn, data) 268 function listener.onincoming(conn, data)
314 local session = sessions[conn]; 269 local session = sessions[conn];
315 if session then 270 if session then