Software / code / prosody
Comparison
plugins/mod_c2s.lua @ 5773:c9a712673d8a
mod_c2s: Add session:sleep() and session:wake() to pause a session (e.g. while waiting for an external event). Needs a gallon or two of testing.
| author | Matthew Wild <mwild1@gmail.com> |
|---|---|
| date | Fri, 09 Aug 2013 11:10:22 +0100 |
| parent | 5764:969e0a054795 |
| child | 5776:bd0ff8ae98a8 |
comparison
equal
deleted
inserted
replaced
| 5772:9cef4b5c2fe3 | 5773:c9a712673d8a |
|---|---|
| 16 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; | 16 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; |
| 17 local uuid_generate = require "util.uuid".generate; | 17 local uuid_generate = require "util.uuid".generate; |
| 18 | 18 |
| 19 local xpcall, tostring, type = xpcall, tostring, type; | 19 local xpcall, tostring, type = xpcall, tostring, type; |
| 20 local traceback = debug.traceback; | 20 local traceback = debug.traceback; |
| 21 local t_insert, t_remove = table.insert, table.remove; | |
| 22 local co_running, co_resume = coroutine.running, coroutine.resume; | |
| 21 | 23 |
| 22 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; | 24 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; |
| 23 | 25 |
| 24 local log = module._log; | 26 local log = module._log; |
| 25 | 27 |
| 29 | 31 |
| 30 local sessions = module:shared("sessions"); | 32 local sessions = module:shared("sessions"); |
| 31 local core_process_stanza = prosody.core_process_stanza; | 33 local core_process_stanza = prosody.core_process_stanza; |
| 32 local hosts = prosody.hosts; | 34 local hosts = prosody.hosts; |
| 33 | 35 |
| 34 local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza }; | 36 local stream_callbacks = { default_ns = "jabber:client" }; |
| 35 local listener = {}; | 37 local listener = {}; |
| 36 | 38 |
| 37 --- Stream events handlers | 39 --- Stream events handlers |
| 38 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; | 40 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; |
| 39 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; | 41 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; |
| 118 end | 120 end |
| 119 | 121 |
| 120 local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end | 122 local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end |
| 121 function stream_callbacks.handlestanza(session, stanza) | 123 function stream_callbacks.handlestanza(session, stanza) |
| 122 stanza = session.filter("stanzas/in", stanza); | 124 stanza = session.filter("stanzas/in", stanza); |
| 123 if stanza then | 125 t_insert(session.pending_stanzas, stanza); |
| 124 return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); | |
| 125 end | |
| 126 end | 126 end |
| 127 | 127 |
| 128 --- Session methods | 128 --- Session methods |
| 129 local function session_close(session, reason) | 129 local function session_close(session, reason) |
| 130 local log = session.log or log; | 130 local log = session.log or log; |
| 222 function session.reset_stream() | 222 function session.reset_stream() |
| 223 session.notopen = true; | 223 session.notopen = true; |
| 224 session.stream:reset(); | 224 session.stream:reset(); |
| 225 end | 225 end |
| 226 | 226 |
| 227 session.thread = coroutine.create(function (stanza) | |
| 228 while true do | |
| 229 core_process_stanza(session, stanza); | |
| 230 stanza = coroutine.yield("ready"); | |
| 231 end | |
| 232 end); | |
| 233 | |
| 234 session.pending_stanzas = {}; | |
| 235 | |
| 227 local filter = session.filter; | 236 local filter = session.filter; |
| 228 function session.data(data) | 237 function session.data(data) |
| 229 data = filter("bytes/in", data); | 238 -- Parse the data, which will store stanzas in session.pending_stanzas |
| 230 if data then | 239 if data then |
| 231 local ok, err = stream:feed(data); | 240 data = filter("bytes/in", data); |
| 232 if ok then return; end | 241 if data then |
| 233 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); | 242 local ok, err = stream:feed(data); |
| 234 session:close("not-well-formed"); | 243 if not ok then |
| 235 end | 244 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); |
| 236 end | 245 session:close("not-well-formed"); |
| 237 | 246 end |
| 238 | 247 end |
| 248 end | |
| 249 | |
| 250 if co_running() ~= session.thread and not session.paused then | |
| 251 if session.state == "wait" then | |
| 252 session.state = "ready"; | |
| 253 local ok, state = co_resume(session.thread); | |
| 254 if not ok then | |
| 255 log("error", "Traceback[c2s]: %s", state); | |
| 256 elseif state == "wait" then | |
| 257 return; | |
| 258 end | |
| 259 end | |
| 260 -- We're not currently running, so start the thread to process pending stanzas | |
| 261 local s, thread = session.pending_stanzas, session.thread; | |
| 262 local n = #s; | |
| 263 while n > 0 and session.state ~= "wait" do | |
| 264 session.log("debug", "processing %d stanzas", n); | |
| 265 local consumed; | |
| 266 for i = 1,n do | |
| 267 local stanza = s[i]; | |
| 268 local ok, state = co_resume(thread, stanza); | |
| 269 if not ok then | |
| 270 log("error", "Traceback[c2s]: %s", state); | |
| 271 elseif state == "wait" then | |
| 272 consumed = i; | |
| 273 session.state = "wait"; | |
| 274 break; | |
| 275 end | |
| 276 end | |
| 277 if not consumed then consumed = n; end | |
| 278 for i = 1, #s do | |
| 279 s[i] = s[consumed+i]; | |
| 280 end | |
| 281 n = #s; | |
| 282 end | |
| 283 end | |
| 284 end | |
| 285 | |
| 239 if c2s_timeout then | 286 if c2s_timeout then |
| 240 add_task(c2s_timeout, function () | 287 add_task(c2s_timeout, function () |
| 241 if session.type == "c2s_unauthed" then | 288 if session.type == "c2s_unauthed" then |
| 242 session:close("connection-timeout"); | 289 session:close("connection-timeout"); |
| 243 end | 290 end |
| 244 end); | 291 end); |
| 245 end | 292 end |
| 246 | 293 |
| 247 session.dispatch_stanza = stream_callbacks.handlestanza; | 294 session.dispatch_stanza = stream_callbacks.handlestanza; |
| 295 | |
| 296 function session:sleep(by) | |
| 297 session.log("debug", "Sleeping for %s", by); | |
| 298 session.paused = by or "?"; | |
| 299 session.conn:pause(); | |
| 300 if co_running() == session.thread then | |
| 301 coroutine.yield("wait"); | |
| 302 end | |
| 303 end | |
| 304 function session:wake(by) | |
| 305 assert(session.paused == (by or "?")); | |
| 306 session.log("debug", "Waking for %s", by); | |
| 307 session.paused = nil; | |
| 308 session.conn:resume(); | |
| 309 session.data(); --FIXME: next tick? | |
| 310 end | |
| 248 end | 311 end |
| 249 | 312 |
| 250 function listener.onincoming(conn, data) | 313 function listener.onincoming(conn, data) |
| 251 local session = sessions[conn]; | 314 local session = sessions[conn]; |
| 252 if session then | 315 if session then |