Software /
code /
prosody
Changeset
10855:70ac7d23673d
mod_admin_socket, util.adminstream: New module to manage a local unix domain socket for admin functionality
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Mon, 01 Jun 2020 15:42:19 +0100 (2020-06-01) |
parents | 10854:472fe13a05f9 |
children | 10856:c99711eda0d1 |
files | plugins/mod_admin_socket.lua util/adminstream.lua |
diffstat | 2 files changed, 354 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/plugins/mod_admin_socket.lua Mon Jun 01 15:42:19 2020 +0100 @@ -0,0 +1,69 @@ +module:set_global(); + +local have_unix, unix = pcall(require, "socket.unix"); + +if not have_unix or type(unix) ~= "table" then + module:log_status("error", "LuaSocket unix socket support not available or incompatible, ensure it is up to date"); + return; +end + +local server = require "net.server"; + +local adminstream = require "util.adminstream"; + +local socket_path = module:get_option_string("admin_socket", prosody.paths.data.."/prosody.sock"); + +local sessions = module:shared("sessions"); + +local function fire_admin_event(session, stanza) + local event_data = { + origin = session, stanza = stanza; + }; + local event_name; + if stanza.attr.xmlns then + event_name = "admin/"..stanza.attr.xmlns..":"..stanza.name; + else + event_name = "admin/"..stanza.name; + end + module:log("debug", "Firing %s", event_name); + return module:fire_event(event_name, event_data); +end + +module:hook("server-stopping", function () + for _, session in pairs(sessions) do + session:close("system-shutdown"); + end + os.remove(socket_path); +end); + +--- Unix domain socket management + +local conn, sock; + +local listeners = adminstream.server(sessions, fire_admin_event).listeners; + +local function accept_connection() + module:log("debug", "accepting..."); + local client = sock:accept(); + if not client then return; end + server.wrapclient(client, "unix", 0, listeners, "*a"); +end + +function module.load() + sock = unix.stream(); + sock:settimeout(0); + os.remove(socket_path); + assert(sock:bind(socket_path)); + assert(sock:listen()); + conn = server.watchfd(sock:getfd(), accept_connection); +end + +function module.unload() + if conn then + conn:close(); + end + if sock then + sock:close(); + end + os.remove(socket_path); +end
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/util/adminstream.lua Mon Jun 01 15:42:19 2020 +0100 @@ -0,0 +1,285 @@ +local st = require "util.stanza"; +local new_xmpp_stream = require "util.xmppstream".new; +local sessionlib = require "util.session"; +local gettime = require "util.time".now; +local runner = require "util.async".runner; +local add_task = require "util.timer".add_task; +local events = require "util.events"; + +local stream_close_timeout = 5; + +local log = require "util.logger".init("adminstream"); + +local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; + +local stream_callbacks = { default_ns = "xmpp:prosody.im/admin" }; + +function stream_callbacks.streamopened(session, attr) + -- run _streamopened in async context + session.thread:run({ stream = "opened", attr = attr }); +end + +function stream_callbacks._streamopened(session, attr) --luacheck: ignore 212/attr + if session.type ~= "client" then + session:open_stream(); + end + session.notopen = nil; +end + +function stream_callbacks.streamclosed(session, attr) + -- run _streamclosed in async context + session.thread:run({ stream = "closed", attr = attr }); +end + +function stream_callbacks._streamclosed(session) + session.log("debug", "Received </stream:stream>"); + session:close(false); +end + +function stream_callbacks.error(session, error, data) + if error == "no-stream" then + session.log("debug", "Invalid opening stream header (%s)", (data:gsub("^([^\1]+)\1", "{%1}"))); + session:close("invalid-namespace"); + elseif error == "parse-error" then + session.log("debug", "Client XML parse error: %s", data); + session:close("not-well-formed"); + elseif error == "stream-error" then + local condition, text = "undefined-condition"; + for child in data:childtags(nil, xmlns_xmpp_streams) do + if child.name ~= "text" then + condition = child.name; + else + text = child:get_text(); + end + if condition ~= "undefined-condition" and text then + break; + end + end + text = condition .. (text and (" ("..text..")") or ""); + session.log("info", "Session closed by remote with error: %s", text); + session:close(nil, text); + end +end + +function stream_callbacks.handlestanza(session, stanza) + session.thread:run(stanza); +end + +local runner_callbacks = {}; + +function runner_callbacks:error(err) + self.data.log("error", "Traceback[c2s]: %s", err); +end + +local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; + +local function destroy_session(session, reason) + if session.destroyed then return; end + session.destroyed = true; + session.log("debug", "Destroying session: %s", reason or "unknown reason"); +end + +local function session_close(session, reason) + local log = session.log or log; + if session.conn then + if session.notopen then + session:open_stream(); + end + if reason then -- nil == no err, initiated by us, false == initiated by client + local stream_error = st.stanza("stream:error"); + if type(reason) == "string" then -- assume stream error + stream_error:tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }); + elseif type(reason) == "table" then + if reason.condition then + stream_error:tag(reason.condition, stream_xmlns_attr):up(); + if reason.text then + stream_error:tag("text", stream_xmlns_attr):text(reason.text):up(); + end + if reason.extra then + stream_error:add_child(reason.extra); + end + elseif reason.name then -- a stanza + stream_error = reason; + end + end + stream_error = tostring(stream_error); + log("debug", "Disconnecting client, <stream:error> is: %s", stream_error); + session.send(stream_error); + end + + session.send("</stream:stream>"); + function session.send() return false; end + + local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason; + session.log("debug", "c2s stream for %s closed: %s", session.full_jid or session.ip or "<unknown>", reason_text or "session closed"); + + -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote + local conn = session.conn; + if reason_text == nil and not session.notopen and session.type == "c2s" then + -- Grace time to process data from authenticated cleanly-closed stream + add_task(stream_close_timeout, function () + if not session.destroyed then + session.log("warn", "Failed to receive a stream close response, closing connection anyway..."); + destroy_session(session); + conn:close(); + end + end); + else + destroy_session(session, reason_text); + conn:close(); + end + else + local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason; + destroy_session(session, reason_text); + end +end + +--- Public methods + +local function new_server(sessions, stanza_handler) + local listeners = {}; + + function listeners.onconnect(conn) + log("debug", "New connection"); + local session = sessionlib.new("admin"); + sessionlib.set_id(session); + sessionlib.set_logger(session); + sessionlib.set_conn(session, conn); + + session.conntime = gettime(); + session.type = "admin"; + + local stream = new_xmpp_stream(session, stream_callbacks); + session.stream = stream; + session.notopen = true; + + session.thread = runner(function (stanza) + if st.is_stanza(stanza) then + stanza_handler(session, stanza); + elseif stanza.stream == "opened" then + stream_callbacks._streamopened(session, stanza.attr); + elseif stanza.stream == "closed" then + stream_callbacks._streamclosed(session, stanza.attr); + end + end, runner_callbacks, session); + + function session.data(data) + -- Parse the data, which will store stanzas in session.pending_stanzas + if data then + local ok, err = stream:feed(data); + if not ok then + session.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300)); + session:close("not-well-formed"); + end + end + end + + session.close = session_close; + + session.send = function (t) + session.log("debug", "Sending[%s]: %s", session.type, t.top_tag and t:top_tag() or t:match("^[^>]*>?")); + return session.rawsend(tostring(t)); + end + + function session.rawsend(t) + local ret, err = conn:write(t); + if not ret then + session.log("debug", "Error writing to connection: %s", err); + return false, err; + end + return true; + end + + sessions[conn] = session; + end + + function listeners.onincoming(conn, data) + local session = sessions[conn]; + if session then + session.data(data); + end + end + + function listeners.ondisconnect(conn, err) + local session = sessions[conn]; + if session then + session.log("info", "Admin client disconnected: %s", err or "connection closed"); + session.conn = nil; + sessions[conn] = nil; + end + end + return { + listeners = listeners; + }; +end + +local function new_client() + local client = { + type = "client"; + events = events.new(); + log = log; + }; + + local listeners = {}; + + function listeners.onconnect(conn) + log("debug", "Connected"); + client.conn = conn; + + local stream = new_xmpp_stream(client, stream_callbacks); + client.stream = stream; + client.notopen = true; + + client.thread = runner(function (stanza) + if st.is_stanza(stanza) then + client.events.fire_event("received", stanza); + elseif stanza.stream == "opened" then + stream_callbacks._streamopened(client, stanza.attr); + client.events.fire_event("connected"); + elseif stanza.stream == "closed" then + client.events.fire_event("disconnected"); + stream_callbacks._streamclosed(client, stanza.attr); + end + end, runner_callbacks, client); + + client.close = session_close; + + function client.send(t) + client.log("debug", "Sending: %s", t.top_tag and t:top_tag() or t:match("^[^>]*>?")); + return client.rawsend(tostring(t)); + end + + function client.rawsend(t) + local ret, err = conn:write(t); + if not ret then + client.log("debug", "Error writing to connection: %s", err); + return false, err; + end + return true; + end + client.log("debug", "Opening stream..."); + client:open_stream(); + end + + function listeners.onincoming(conn, data) --luacheck: ignore 212/conn + local ok, err = client.stream:feed(data); + if not ok then + client.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300)); + client:close("not-well-formed"); + end + end + + function listeners.ondisconnect(conn, err) --luacheck: ignore 212/conn + client.log("info", "Admin client disconnected: %s", err or "connection closed"); + client.conn = nil; + end + + client.listeners = listeners; + + return client; +end + +return { + server = new_server; + client = new_client; +};