Comparison

plugins/mod_s2s/mod_s2s.lua @ 7451:464a8a8de625

mod_s2s: Add util.async support
author Kim Alvefur <zash@zash.se>
date Mon, 30 May 2016 13:36:43 +0200
parent 7450:3ae19750cd46
child 7467:9a73c85baffe
comparison
equal deleted inserted replaced
7450:3ae19750cd46 7451:464a8a8de625
24 local s2s_new_incoming = require "core.s2smanager".new_incoming; 24 local s2s_new_incoming = require "core.s2smanager".new_incoming;
25 local s2s_new_outgoing = require "core.s2smanager".new_outgoing; 25 local s2s_new_outgoing = require "core.s2smanager".new_outgoing;
26 local s2s_destroy_session = require "core.s2smanager".destroy_session; 26 local s2s_destroy_session = require "core.s2smanager".destroy_session;
27 local uuid_gen = require "util.uuid".generate; 27 local uuid_gen = require "util.uuid".generate;
28 local fire_global_event = prosody.events.fire_event; 28 local fire_global_event = prosody.events.fire_event;
29 local runner = require "util.async".runner;
29 30
30 local s2sout = module:require("s2sout"); 31 local s2sout = module:require("s2sout");
31 32
32 local connect_timeout = module:get_option_number("s2s_timeout", 90); 33 local connect_timeout = module:get_option_number("s2s_timeout", 90);
33 local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5); 34 local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5);
38 local require_encryption = module:get_option_boolean("s2s_require_encryption", false); 39 local require_encryption = module:get_option_boolean("s2s_require_encryption", false);
39 40
40 local measure_connections = module:measure("connections", "counter"); 41 local measure_connections = module:measure("connections", "counter");
41 42
42 local sessions = module:shared("sessions"); 43 local sessions = module:shared("sessions");
44
45 local runner_callbacks = {};
43 46
44 local log = module._log; 47 local log = module._log;
45 48
46 --- Handle stanzas to remote domains 49 --- Handle stanzas to remote domains
47 50
255 return module:fire_event("s2s-check-certificate", { host = host, session = session, cert = cert }); 258 return module:fire_event("s2s-check-certificate", { host = host, session = session, cert = cert });
256 end 259 end
257 260
258 --- XMPP stream event handlers 261 --- XMPP stream event handlers
259 262
260 local stream_callbacks = { default_ns = "jabber:server", handlestanza = core_process_stanza }; 263 local stream_callbacks = { default_ns = "jabber:server" };
264
265 function stream_callbacks.handlestanza(session, stanza)
266 stanza = session.filter("stanzas/in", stanza);
267 session.thread:run(stanza);
268 end
261 269
262 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; 270 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
263 271
264 function stream_callbacks.streamopened(session, attr) 272 function stream_callbacks.streamopened(session, attr)
273 -- run _streamopened in async context
274 session.thread:run({ attr = attr });
275 end
276
277 function stream_callbacks._streamopened(session, attr)
265 session.version = tonumber(attr.version) or 0; 278 session.version = tonumber(attr.version) or 0;
266 279
267 -- TODO: Rename session.secure to session.encrypted 280 -- TODO: Rename session.secure to session.encrypted
268 if session.secure == false then 281 if session.secure == false then
269 session.secure = true; 282 session.secure = true;
433 session.log("info", "Session closed by remote with error: %s", text); 446 session.log("info", "Session closed by remote with error: %s", text);
434 session:close(nil, text); 447 session:close(nil, text);
435 end 448 end
436 end 449 end
437 450
438 local function handleerr(err) log("error", "Traceback[s2s]: %s", traceback(tostring(err), 2)); end
439 function stream_callbacks.handlestanza(session, stanza)
440 stanza = session.filter("stanzas/in", stanza);
441 if stanza then
442 return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
443 end
444 end
445
446 local listener = {}; 451 local listener = {};
447 452
448 --- Session methods 453 --- Session methods
449 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; 454 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
450 local function session_close(session, reason, remote_reason) 455 local function session_close(session, reason, remote_reason)
515 end 520 end
516 521
517 -- Session initialization logic shared by incoming and outgoing 522 -- Session initialization logic shared by incoming and outgoing
518 local function initialize_session(session) 523 local function initialize_session(session)
519 local stream = new_xmpp_stream(session, stream_callbacks); 524 local stream = new_xmpp_stream(session, stream_callbacks);
525
526 session.thread = runner(function (stanza)
527 if stanza.name == nil then
528 stream_callbacks._streamopened(session, stanza.attr);
529 else
530 core_process_stanza(session, stanza);
531 end
532 end, runner_callbacks, session);
533
520 local log = session.log or log; 534 local log = session.log or log;
521 session.stream = stream; 535 session.stream = stream;
522 536
523 session.notopen = true; 537 session.notopen = true;
524 538
576 -- Not connected, need to close session and clean up 590 -- Not connected, need to close session and clean up
577 (session.log or log)("debug", "Destroying incomplete session %s->%s due to inactivity", 591 (session.log or log)("debug", "Destroying incomplete session %s->%s due to inactivity",
578 session.from_host or "(unknown)", session.to_host or "(unknown)"); 592 session.from_host or "(unknown)", session.to_host or "(unknown)");
579 session:close("connection-timeout"); 593 session:close("connection-timeout");
580 end); 594 end);
595 end
596
597 function runner_callbacks:ready()
598 self.data.log("debug", "Runner %s ready (%s)", self.thread, coroutine.status(self.thread));
599 self.data.conn:resume();
600 end
601
602 function runner_callbacks:waiting()
603 self.data.log("debug", "Runner %s waiting (%s)", self.thread, coroutine.status(self.thread));
604 self.data.conn:pause();
605 end
606
607 function runner_callbacks:error(err)
608 (self.data.log or log)("error", "Traceback[s2s]: %s", err);
581 end 609 end
582 610
583 function listener.onconnect(conn) 611 function listener.onconnect(conn)
584 measure_connections(1); 612 measure_connections(1);
585 conn:setoption("keepalive", opt_keepalives); 613 conn:setoption("keepalive", opt_keepalives);