Software /
code /
prosody
Comparison
plugins/mod_c2s.lua @ 5789:3b05a86631b9
mod_c2s: Port coroutine code to util.async
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Sun, 11 Aug 2013 14:46:27 +0100 |
parent | 5776:bd0ff8ae98a8 |
child | 5801:224644752bf4 |
comparison
equal
deleted
inserted
replaced
5788:3556f338caa3 | 5789:3b05a86631b9 |
---|---|
13 local nameprep = require "util.encodings".stringprep.nameprep; | 13 local nameprep = require "util.encodings".stringprep.nameprep; |
14 local sessionmanager = require "core.sessionmanager"; | 14 local sessionmanager = require "core.sessionmanager"; |
15 local st = require "util.stanza"; | 15 local st = require "util.stanza"; |
16 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; | 16 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; |
17 local uuid_generate = require "util.uuid".generate; | 17 local uuid_generate = require "util.uuid".generate; |
18 local runner = require "util.async".runner; | |
18 | 19 |
19 local xpcall, tostring, type = xpcall, tostring, type; | 20 local xpcall, tostring, type = xpcall, tostring, type; |
20 local traceback = debug.traceback; | |
21 local t_insert, t_remove = table.insert, table.remove; | 21 local t_insert, t_remove = table.insert, table.remove; |
22 local co_running, co_resume = coroutine.running, coroutine.resume; | |
23 | 22 |
24 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; | 23 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; |
25 | 24 |
26 local log = module._log; | 25 local log = module._log; |
27 | 26 |
33 local core_process_stanza = prosody.core_process_stanza; | 32 local core_process_stanza = prosody.core_process_stanza; |
34 local hosts = prosody.hosts; | 33 local hosts = prosody.hosts; |
35 | 34 |
36 local stream_callbacks = { default_ns = "jabber:client" }; | 35 local stream_callbacks = { default_ns = "jabber:client" }; |
37 local listener = {}; | 36 local listener = {}; |
37 local runner_callbacks = {}; | |
38 | 38 |
39 --- Stream events handlers | 39 --- Stream events handlers |
40 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; | 40 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; |
41 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; | 41 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; |
42 | 42 |
117 session.log("info", "Session closed by remote with error: %s", text); | 117 session.log("info", "Session closed by remote with error: %s", text); |
118 session:close(nil, text); | 118 session:close(nil, text); |
119 end | 119 end |
120 end | 120 end |
121 | 121 |
122 local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end | |
123 function stream_callbacks.handlestanza(session, stanza) | 122 function stream_callbacks.handlestanza(session, stanza) |
124 stanza = session.filter("stanzas/in", stanza); | 123 stanza = session.filter("stanzas/in", stanza); |
125 t_insert(session.pending_stanzas, stanza); | 124 session.thread:run(stanza); |
126 end | 125 end |
127 | 126 |
128 --- Session methods | 127 --- Session methods |
129 local function session_close(session, reason) | 128 local function session_close(session, reason) |
130 local log = session.log or log; | 129 local log = session.log or log; |
187 session:close{ condition = "not-authorized", text = "Account deleted" }; | 186 session:close{ condition = "not-authorized", text = "Account deleted" }; |
188 end | 187 end |
189 end | 188 end |
190 end, 200); | 189 end, 200); |
191 | 190 |
191 function runner_callbacks:ready() | |
192 self.data.conn:resume(); | |
193 end | |
194 | |
195 function runner_callbacks:waiting() | |
196 self.data.conn:pause(); | |
197 end | |
198 | |
199 function runner_callbacks:error(err) | |
200 (self.data.log or log)("error", "Traceback[c2s]: %s", err); | |
201 end | |
202 | |
192 --- Port listener | 203 --- Port listener |
193 function listener.onconnect(conn) | 204 function listener.onconnect(conn) |
194 local session = sm_new_session(conn); | 205 local session = sm_new_session(conn); |
195 sessions[conn] = session; | 206 sessions[conn] = session; |
196 | 207 |
222 function session.reset_stream() | 233 function session.reset_stream() |
223 session.notopen = true; | 234 session.notopen = true; |
224 session.stream:reset(); | 235 session.stream:reset(); |
225 end | 236 end |
226 | 237 |
227 session.thread = coroutine.create(function (stanza) | 238 session.thread = runner(function (stanza) |
228 while true do | 239 core_process_stanza(session, stanza); |
229 core_process_stanza(session, stanza); | 240 end, runner_callbacks, session); |
230 stanza = coroutine.yield("ready"); | |
231 end | |
232 end); | |
233 | |
234 session.pending_stanzas = {}; | |
235 | 241 |
236 local filter = session.filter; | 242 local filter = session.filter; |
237 function session.data(data) | 243 function session.data(data) |
238 -- Parse the data, which will store stanzas in session.pending_stanzas | 244 -- Parse the data, which will store stanzas in session.pending_stanzas |
239 if data then | 245 if data then |
244 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); | 250 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); |
245 session:close("not-well-formed"); | 251 session:close("not-well-formed"); |
246 end | 252 end |
247 end | 253 end |
248 end | 254 end |
249 | |
250 if co_running() ~= session.thread and not session.paused then | |
251 if session.state == "wait" then | |
252 session.state = "ready"; | |
253 local ok, state = co_resume(session.thread); | |
254 if not ok then | |
255 log("error", "Traceback[c2s]: %s", state); | |
256 elseif state == "wait" then | |
257 return; | |
258 end | |
259 end | |
260 -- We're not currently running, so start the thread to process pending stanzas | |
261 local s, thread = session.pending_stanzas, session.thread; | |
262 local n = #s; | |
263 while n > 0 and session.state ~= "wait" do | |
264 session.log("debug", "processing %d stanzas", n); | |
265 local consumed; | |
266 for i = 1,n do | |
267 local stanza = s[i]; | |
268 local ok, state = co_resume(thread, stanza); | |
269 if not ok then | |
270 log("error", "Traceback[c2s]: %s", state); | |
271 elseif state == "wait" then | |
272 consumed = i; | |
273 session.state = "wait"; | |
274 break; | |
275 end | |
276 end | |
277 if not consumed then consumed = n; end | |
278 for i = 1, #s do | |
279 s[i] = s[consumed+i]; | |
280 end | |
281 n = #s; | |
282 end | |
283 end | |
284 end | 255 end |
285 | 256 |
286 if c2s_timeout then | 257 if c2s_timeout then |
287 add_task(c2s_timeout, function () | 258 add_task(c2s_timeout, function () |
288 if session.type == "c2s_unauthed" then | 259 if session.type == "c2s_unauthed" then |
290 end | 261 end |
291 end); | 262 end); |
292 end | 263 end |
293 | 264 |
294 session.dispatch_stanza = stream_callbacks.handlestanza; | 265 session.dispatch_stanza = stream_callbacks.handlestanza; |
295 | |
296 function session:sleep(by) | |
297 session.log("debug", "Sleeping for %s", by); | |
298 session.paused = by or "?"; | |
299 session.conn:pause(); | |
300 if co_running() == session.thread then | |
301 coroutine.yield("wait"); | |
302 end | |
303 end | |
304 function session:wake(by) | |
305 assert(session.paused == (by or "?")); | |
306 session.log("debug", "Waking for %s", by); | |
307 session.paused = nil; | |
308 session.conn:resume(); | |
309 session.data(); --FIXME: next tick? | |
310 end | |
311 end | 266 end |
312 | 267 |
313 function listener.onincoming(conn, data) | 268 function listener.onincoming(conn, data) |
314 local session = sessions[conn]; | 269 local session = sessions[conn]; |
315 if session then | 270 if session then |