Software /
code /
prosody
Comparison
plugins/mod_s2s/mod_s2s.lua @ 7451:464a8a8de625
mod_s2s: Add util.async support
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Mon, 30 May 2016 13:36:43 +0200 |
parent | 7450:3ae19750cd46 |
child | 7467:9a73c85baffe |
comparison
equal
deleted
inserted
replaced
7450:3ae19750cd46 | 7451:464a8a8de625 |
---|---|
24 local s2s_new_incoming = require "core.s2smanager".new_incoming; | 24 local s2s_new_incoming = require "core.s2smanager".new_incoming; |
25 local s2s_new_outgoing = require "core.s2smanager".new_outgoing; | 25 local s2s_new_outgoing = require "core.s2smanager".new_outgoing; |
26 local s2s_destroy_session = require "core.s2smanager".destroy_session; | 26 local s2s_destroy_session = require "core.s2smanager".destroy_session; |
27 local uuid_gen = require "util.uuid".generate; | 27 local uuid_gen = require "util.uuid".generate; |
28 local fire_global_event = prosody.events.fire_event; | 28 local fire_global_event = prosody.events.fire_event; |
29 local runner = require "util.async".runner; | |
29 | 30 |
30 local s2sout = module:require("s2sout"); | 31 local s2sout = module:require("s2sout"); |
31 | 32 |
32 local connect_timeout = module:get_option_number("s2s_timeout", 90); | 33 local connect_timeout = module:get_option_number("s2s_timeout", 90); |
33 local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5); | 34 local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5); |
38 local require_encryption = module:get_option_boolean("s2s_require_encryption", false); | 39 local require_encryption = module:get_option_boolean("s2s_require_encryption", false); |
39 | 40 |
40 local measure_connections = module:measure("connections", "counter"); | 41 local measure_connections = module:measure("connections", "counter"); |
41 | 42 |
42 local sessions = module:shared("sessions"); | 43 local sessions = module:shared("sessions"); |
44 | |
45 local runner_callbacks = {}; | |
43 | 46 |
44 local log = module._log; | 47 local log = module._log; |
45 | 48 |
46 --- Handle stanzas to remote domains | 49 --- Handle stanzas to remote domains |
47 | 50 |
255 return module:fire_event("s2s-check-certificate", { host = host, session = session, cert = cert }); | 258 return module:fire_event("s2s-check-certificate", { host = host, session = session, cert = cert }); |
256 end | 259 end |
257 | 260 |
258 --- XMPP stream event handlers | 261 --- XMPP stream event handlers |
259 | 262 |
260 local stream_callbacks = { default_ns = "jabber:server", handlestanza = core_process_stanza }; | 263 local stream_callbacks = { default_ns = "jabber:server" }; |
264 | |
265 function stream_callbacks.handlestanza(session, stanza) | |
266 stanza = session.filter("stanzas/in", stanza); | |
267 session.thread:run(stanza); | |
268 end | |
261 | 269 |
262 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; | 270 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; |
263 | 271 |
264 function stream_callbacks.streamopened(session, attr) | 272 function stream_callbacks.streamopened(session, attr) |
273 -- run _streamopened in async context | |
274 session.thread:run({ attr = attr }); | |
275 end | |
276 | |
277 function stream_callbacks._streamopened(session, attr) | |
265 session.version = tonumber(attr.version) or 0; | 278 session.version = tonumber(attr.version) or 0; |
266 | 279 |
267 -- TODO: Rename session.secure to session.encrypted | 280 -- TODO: Rename session.secure to session.encrypted |
268 if session.secure == false then | 281 if session.secure == false then |
269 session.secure = true; | 282 session.secure = true; |
433 session.log("info", "Session closed by remote with error: %s", text); | 446 session.log("info", "Session closed by remote with error: %s", text); |
434 session:close(nil, text); | 447 session:close(nil, text); |
435 end | 448 end |
436 end | 449 end |
437 | 450 |
438 local function handleerr(err) log("error", "Traceback[s2s]: %s", traceback(tostring(err), 2)); end | |
439 function stream_callbacks.handlestanza(session, stanza) | |
440 stanza = session.filter("stanzas/in", stanza); | |
441 if stanza then | |
442 return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); | |
443 end | |
444 end | |
445 | |
446 local listener = {}; | 451 local listener = {}; |
447 | 452 |
448 --- Session methods | 453 --- Session methods |
449 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; | 454 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; |
450 local function session_close(session, reason, remote_reason) | 455 local function session_close(session, reason, remote_reason) |
515 end | 520 end |
516 | 521 |
517 -- Session initialization logic shared by incoming and outgoing | 522 -- Session initialization logic shared by incoming and outgoing |
518 local function initialize_session(session) | 523 local function initialize_session(session) |
519 local stream = new_xmpp_stream(session, stream_callbacks); | 524 local stream = new_xmpp_stream(session, stream_callbacks); |
525 | |
526 session.thread = runner(function (stanza) | |
527 if stanza.name == nil then | |
528 stream_callbacks._streamopened(session, stanza.attr); | |
529 else | |
530 core_process_stanza(session, stanza); | |
531 end | |
532 end, runner_callbacks, session); | |
533 | |
520 local log = session.log or log; | 534 local log = session.log or log; |
521 session.stream = stream; | 535 session.stream = stream; |
522 | 536 |
523 session.notopen = true; | 537 session.notopen = true; |
524 | 538 |
576 -- Not connected, need to close session and clean up | 590 -- Not connected, need to close session and clean up |
577 (session.log or log)("debug", "Destroying incomplete session %s->%s due to inactivity", | 591 (session.log or log)("debug", "Destroying incomplete session %s->%s due to inactivity", |
578 session.from_host or "(unknown)", session.to_host or "(unknown)"); | 592 session.from_host or "(unknown)", session.to_host or "(unknown)"); |
579 session:close("connection-timeout"); | 593 session:close("connection-timeout"); |
580 end); | 594 end); |
595 end | |
596 | |
597 function runner_callbacks:ready() | |
598 self.data.log("debug", "Runner %s ready (%s)", self.thread, coroutine.status(self.thread)); | |
599 self.data.conn:resume(); | |
600 end | |
601 | |
602 function runner_callbacks:waiting() | |
603 self.data.log("debug", "Runner %s waiting (%s)", self.thread, coroutine.status(self.thread)); | |
604 self.data.conn:pause(); | |
605 end | |
606 | |
607 function runner_callbacks:error(err) | |
608 (self.data.log or log)("error", "Traceback[s2s]: %s", err); | |
581 end | 609 end |
582 | 610 |
583 function listener.onconnect(conn) | 611 function listener.onconnect(conn) |
584 measure_connections(1); | 612 measure_connections(1); |
585 conn:setoption("keepalive", opt_keepalives); | 613 conn:setoption("keepalive", opt_keepalives); |