Software /
code /
prosody
Comparison
plugins/mod_c2s.lua @ 5773:c9a712673d8a
mod_c2s: Add session:sleep() and session:wake() to pause a session (e.g. while waiting for an external event). Needs a gallon or two of testing.
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Fri, 09 Aug 2013 11:10:22 +0100 |
parent | 5764:969e0a054795 |
child | 5776:bd0ff8ae98a8 |
comparison
equal
deleted
inserted
replaced
5772:9cef4b5c2fe3 | 5773:c9a712673d8a |
---|---|
16 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; | 16 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; |
17 local uuid_generate = require "util.uuid".generate; | 17 local uuid_generate = require "util.uuid".generate; |
18 | 18 |
19 local xpcall, tostring, type = xpcall, tostring, type; | 19 local xpcall, tostring, type = xpcall, tostring, type; |
20 local traceback = debug.traceback; | 20 local traceback = debug.traceback; |
21 local t_insert, t_remove = table.insert, table.remove; | |
22 local co_running, co_resume = coroutine.running, coroutine.resume; | |
21 | 23 |
22 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; | 24 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; |
23 | 25 |
24 local log = module._log; | 26 local log = module._log; |
25 | 27 |
29 | 31 |
30 local sessions = module:shared("sessions"); | 32 local sessions = module:shared("sessions"); |
31 local core_process_stanza = prosody.core_process_stanza; | 33 local core_process_stanza = prosody.core_process_stanza; |
32 local hosts = prosody.hosts; | 34 local hosts = prosody.hosts; |
33 | 35 |
34 local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza }; | 36 local stream_callbacks = { default_ns = "jabber:client" }; |
35 local listener = {}; | 37 local listener = {}; |
36 | 38 |
37 --- Stream events handlers | 39 --- Stream events handlers |
38 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; | 40 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; |
39 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; | 41 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; |
118 end | 120 end |
119 | 121 |
120 local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end | 122 local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end |
121 function stream_callbacks.handlestanza(session, stanza) | 123 function stream_callbacks.handlestanza(session, stanza) |
122 stanza = session.filter("stanzas/in", stanza); | 124 stanza = session.filter("stanzas/in", stanza); |
123 if stanza then | 125 t_insert(session.pending_stanzas, stanza); |
124 return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); | |
125 end | |
126 end | 126 end |
127 | 127 |
128 --- Session methods | 128 --- Session methods |
129 local function session_close(session, reason) | 129 local function session_close(session, reason) |
130 local log = session.log or log; | 130 local log = session.log or log; |
222 function session.reset_stream() | 222 function session.reset_stream() |
223 session.notopen = true; | 223 session.notopen = true; |
224 session.stream:reset(); | 224 session.stream:reset(); |
225 end | 225 end |
226 | 226 |
227 session.thread = coroutine.create(function (stanza) | |
228 while true do | |
229 core_process_stanza(session, stanza); | |
230 stanza = coroutine.yield("ready"); | |
231 end | |
232 end); | |
233 | |
234 session.pending_stanzas = {}; | |
235 | |
227 local filter = session.filter; | 236 local filter = session.filter; |
228 function session.data(data) | 237 function session.data(data) |
229 data = filter("bytes/in", data); | 238 -- Parse the data, which will store stanzas in session.pending_stanzas |
230 if data then | 239 if data then |
231 local ok, err = stream:feed(data); | 240 data = filter("bytes/in", data); |
232 if ok then return; end | 241 if data then |
233 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); | 242 local ok, err = stream:feed(data); |
234 session:close("not-well-formed"); | 243 if not ok then |
235 end | 244 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); |
236 end | 245 session:close("not-well-formed"); |
237 | 246 end |
238 | 247 end |
248 end | |
249 | |
250 if co_running() ~= session.thread and not session.paused then | |
251 if session.state == "wait" then | |
252 session.state = "ready"; | |
253 local ok, state = co_resume(session.thread); | |
254 if not ok then | |
255 log("error", "Traceback[c2s]: %s", state); | |
256 elseif state == "wait" then | |
257 return; | |
258 end | |
259 end | |
260 -- We're not currently running, so start the thread to process pending stanzas | |
261 local s, thread = session.pending_stanzas, session.thread; | |
262 local n = #s; | |
263 while n > 0 and session.state ~= "wait" do | |
264 session.log("debug", "processing %d stanzas", n); | |
265 local consumed; | |
266 for i = 1,n do | |
267 local stanza = s[i]; | |
268 local ok, state = co_resume(thread, stanza); | |
269 if not ok then | |
270 log("error", "Traceback[c2s]: %s", state); | |
271 elseif state == "wait" then | |
272 consumed = i; | |
273 session.state = "wait"; | |
274 break; | |
275 end | |
276 end | |
277 if not consumed then consumed = n; end | |
278 for i = 1, #s do | |
279 s[i] = s[consumed+i]; | |
280 end | |
281 n = #s; | |
282 end | |
283 end | |
284 end | |
285 | |
239 if c2s_timeout then | 286 if c2s_timeout then |
240 add_task(c2s_timeout, function () | 287 add_task(c2s_timeout, function () |
241 if session.type == "c2s_unauthed" then | 288 if session.type == "c2s_unauthed" then |
242 session:close("connection-timeout"); | 289 session:close("connection-timeout"); |
243 end | 290 end |
244 end); | 291 end); |
245 end | 292 end |
246 | 293 |
247 session.dispatch_stanza = stream_callbacks.handlestanza; | 294 session.dispatch_stanza = stream_callbacks.handlestanza; |
295 | |
296 function session:sleep(by) | |
297 session.log("debug", "Sleeping for %s", by); | |
298 session.paused = by or "?"; | |
299 session.conn:pause(); | |
300 if co_running() == session.thread then | |
301 coroutine.yield("wait"); | |
302 end | |
303 end | |
304 function session:wake(by) | |
305 assert(session.paused == (by or "?")); | |
306 session.log("debug", "Waking for %s", by); | |
307 session.paused = nil; | |
308 session.conn:resume(); | |
309 session.data(); --FIXME: next tick? | |
310 end | |
248 end | 311 end |
249 | 312 |
250 function listener.onincoming(conn, data) | 313 function listener.onincoming(conn, data) |
251 local session = sessions[conn]; | 314 local session = sessions[conn]; |
252 if session then | 315 if session then |