Software /
code /
prosody
File
util/adminstream.lua @ 13652:a08065207ef0
net.server_epoll: Call :shutdown() on TLS sockets when supported
Comment from Matthew:
This fixes a potential issue where the Prosody process gets blocked on sockets
waiting for them to close. Unlike non-TLS sockets, closing a TLS socket sends
layer 7 data, and this can cause problems for sockets which are in the process
of being cleaned up.
This depends on LuaSec changes which are not yet upstream.
From Martijn's original email:
So first my analysis of luasec. in ssl.c the socket is put into blocking
mode right before calling SSL_shutdown() inside meth_destroy(). My best
guess to why this is is because meth_destroy is linked to the __close
and __gc methods, which can't exactly be called multiple times and
luasec does want to make sure that a tls session is shutdown as clean
as possible.
I can't say I disagree with this reasoning and don't want to change this
behaviour. My solution to this without changing the current behaviour is
to introduce a shutdown() method. I am aware that this overlaps in a
conflicting way with tcp's shutdown method, but it stays close to the
OpenSSL name. This method calls SSL_shutdown() in the current
(non)blocking mode of the underlying socket and returns a boolean
whether or not the shutdown is completed (matching SSL_shutdown()'s 0
or 1 return values), and returns the familiar ssl_ioerror() strings on
error with a false for completion. This error can then be used to
determine if we have wantread/wantwrite to finalize things. Once
meth_shutdown() has been called once a shutdown flag will be set, which
indicates to meth_destroy() that the SSL_shutdown() has been handled
by the application and it shouldn't be needed to set the socket to
blocking mode. I've left the SSL_shutdown() call in the
LSEC_STATE_CONNECTED to prevent TOCTOU if the application reaches a
timeout for the shutdown code, which might allow SSL_shutdown() to
clean up anyway at the last possible moment.
Another thing I've changed to luasec is the call to socket_setblocking()
right before calling close(2) in socket_destroy() in usocket.c.
According to the latest POSIX[0]:
Note that the requirement for close() on a socket to block for up to
the current linger interval is not conditional on the O_NONBLOCK
setting.
Which I read to mean that removing O_NONBLOCK on the socket before close
doesn't impact the behaviour and only causes noise in system call
tracers. I didn't touch the windows bits of this, since I don't do
windows.
For the prosody side of things I've made the TLS shutdown bits resemble
interface:onwritable(), and put it under a combined guard of self._tls
and self.conn.shutdown. The self._tls bit is there to prevent getting
stuck on this condition, and self.conn.shutdown is there to prevent the
code being called by instances where the patched luasec isn't deployed.
The destroy() method can be called from various places and is read by
me as the "we give up" error path. To accommodate for these unexpected
entrypoints I've added a single call to self.conn:shutdown() to prevent
the socket being put into blocking mode. I have no expectations that
there is any other use here. Same as previous, the self.conn.shutdown
check is there to make sure it's not called on unpatched luasec
deployments and self._tls is there to make sure we don't call shutdown()
on tcp sockets.
I wouldn't recommend logging of the conn:shutdown() error inside
close(), since a lot of clients simply close the connection before
SSL_shutdown() is done.
author | Martijn van Duren <martijn@openbsd.org> |
---|---|
date | Thu, 06 Feb 2025 15:04:38 +0000 |
parent | 13589:b1b931d5fee8 |
line wrap: on
line source
local st = require "prosody.util.stanza"; local new_xmpp_stream = require "prosody.util.xmppstream".new; local sessionlib = require "prosody.util.session"; local gettime = require "prosody.util.time".now; local runner = require "prosody.util.async".runner; local add_task = require "prosody.util.timer".add_task; local events = require "prosody.util.events"; local server = require "prosody.net.server"; local stream_close_timeout = 5; local log = require "prosody.util.logger".init("adminstream"); local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; local stream_callbacks = { default_ns = "xmpp:prosody.im/admin" }; function stream_callbacks.streamopened(session, attr) -- run _streamopened in async context session.thread:run({ stream = "opened", attr = attr }); end function stream_callbacks._streamopened(session, attr) --luacheck: ignore 212/attr if session.type ~= "client" then session:open_stream(); end session.notopen = nil; end function stream_callbacks.streamclosed(session, attr) -- run _streamclosed in async context session.thread:run({ stream = "closed", attr = attr }); end function stream_callbacks._streamclosed(session) session.log("debug", "Received </stream:stream>"); session:close(false); end function stream_callbacks.error(session, error, data) if error == "no-stream" then session.log("debug", "Invalid opening stream header (%s)", (data:gsub("^([^\1]+)\1", "{%1}"))); session:close("invalid-namespace"); elseif error == "parse-error" then session.log("debug", "Client XML parse error: %s", data); session:close("not-well-formed"); elseif error == "stream-error" then local condition, text = "undefined-condition"; for child in data:childtags(nil, xmlns_xmpp_streams) do if child.name ~= "text" then condition = child.name; else text = child:get_text(); end if condition ~= "undefined-condition" and text then break; end end text = condition .. (text and (" ("..text..")") or ""); session.log("info", "Session closed by remote with error: %s", text); session:close(nil, text); end end function stream_callbacks.handlestanza(session, stanza) session.thread:run(stanza); end local runner_callbacks = {}; function runner_callbacks:error(err) self.data.log("error", "Traceback[c2s]: %s", err); end local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; local function destroy_session(session, reason) if session.destroyed then return; end session.destroyed = true; session.log("debug", "Destroying session: %s", reason or "unknown reason"); end local function session_close(session, reason) local log = session.log or log; if session.conn then if session.notopen then session:open_stream(); end if reason then -- nil == no err, initiated by us, false == initiated by client local stream_error = st.stanza("stream:error"); if type(reason) == "string" then -- assume stream error stream_error:tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }); elseif type(reason) == "table" then if reason.condition then stream_error:tag(reason.condition, stream_xmlns_attr):up(); if reason.text then stream_error:tag("text", stream_xmlns_attr):text(reason.text):up(); end if reason.extra then stream_error:add_child(reason.extra); end elseif reason.name then -- a stanza stream_error = reason; end end stream_error = tostring(stream_error); log("debug", "Disconnecting client, <stream:error> is: %s", stream_error); session.send(stream_error); end session.send("</stream:stream>"); function session.send() return false; end local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason; session.log("debug", "c2s stream for %s closed: %s", session.full_jid or session.ip or "<unknown>", reason_text or "session closed"); -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote local conn = session.conn; if reason_text == nil and not session.notopen and session.type == "c2s" then -- Grace time to process data from authenticated cleanly-closed stream add_task(stream_close_timeout, function () if not session.destroyed then session.log("warn", "Failed to receive a stream close response, closing connection anyway..."); destroy_session(session); conn:close(); end end); else destroy_session(session, reason_text); conn:close(); end else local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason; destroy_session(session, reason_text); end end --- Public methods local function new_connection(socket_path, listeners) local have_unix, unix = pcall(require, "socket.unix"); if have_unix and type(unix) == "function" then -- COMPAT #1717 -- Before the introduction of datagram support, only the stream socket -- constructor was exported instead of a module table. Due to the lack of a -- proper release of LuaSocket, distros have settled on shipping either the -- last RC tag or some commit since then. -- Here we accommodate both variants. unix = { stream = unix }; end if type(unix) ~= "table" then have_unix = false; end local conn, sock; return { connect = function () if not have_unix then return nil, "no unix socket support"; end if sock or conn then return nil, "already connected"; end sock = unix.stream(); sock:settimeout(0); local ok, err = sock:connect(socket_path); if not ok then return nil, err; end conn = server.wrapclient(sock, nil, nil, listeners, "*a"); return true; end; disconnect = function () if conn then conn:close(); conn = nil; end if sock then sock:close(); sock = nil; end return true; end; }; end local function new_server(sessions, stanza_handler) local s = { events = events.new(); listeners = {}; }; function s.listeners.onconnect(conn) log("debug", "New connection"); local session = sessionlib.new("admin"); sessionlib.set_id(session); sessionlib.set_logger(session); sessionlib.set_conn(session, conn); session.conntime = gettime(); session.type = "admin"; local stream = new_xmpp_stream(session, stream_callbacks); session.stream = stream; session.notopen = true; session.thread = runner(function (stanza) if st.is_stanza(stanza) then stanza_handler(session, stanza); elseif stanza.stream == "opened" then stream_callbacks._streamopened(session, stanza.attr); elseif stanza.stream == "closed" then stream_callbacks._streamclosed(session, stanza.attr); end end, runner_callbacks, session); function session.data(data) -- Parse the data, which will store stanzas in session.pending_stanzas if data then local ok, err = stream:feed(data); if not ok then session.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300)); session:close("not-well-formed"); end end end session.close = session_close; session.send = function (t) session.log("debug", "Sending[%s]: %s", session.type, t.top_tag and t:top_tag() or t:match("^[^>]*>?")); return session.rawsend(tostring(t)); end function session.rawsend(t) local ret, err = conn:write(t); if not ret then session.log("debug", "Error writing to connection: %s", err); return false, err; end return true; end sessions[conn] = session; end function s.listeners.onincoming(conn, data) local session = sessions[conn]; if session then session.data(data); end end function s.listeners.ondisconnect(conn, err) local session = sessions[conn]; if session then session.log("info", "Admin client disconnected: %s", err or "connection closed"); session.conn = nil; sessions[conn] = nil; s.events.fire_event("disconnected", { session = session }); end end function s.listeners.onreadtimeout(conn) return conn:send(" "); end return s; end local function new_client() local client = { type = "client"; events = events.new(); log = log; }; local listeners = {}; function listeners.onconnect(conn) log("debug", "Connected"); client.conn = conn; local stream = new_xmpp_stream(client, stream_callbacks); client.stream = stream; client.notopen = true; client.thread = runner(function (stanza) if st.is_stanza(stanza) then if not client.events.fire_event("received", stanza) and not stanza.attr.xmlns then client.events.fire_event("received/"..stanza.name, stanza); end elseif stanza.stream == "opened" then stream_callbacks._streamopened(client, stanza.attr); client.events.fire_event("connected"); elseif stanza.stream == "closed" then client.events.fire_event("disconnected"); stream_callbacks._streamclosed(client, stanza.attr); end end, runner_callbacks, client); client.close = session_close; function client.send(t) client.log("debug", "Sending: %s", t.top_tag and t:top_tag() or t:match("^[^>]*>?")); return client.rawsend(tostring(t)); end function client.rawsend(t) local ret, err = conn:write(t); if not ret then client.log("debug", "Error writing to connection: %s", err); return false, err; end return true; end client.log("debug", "Opening stream..."); client:open_stream(); end function listeners.onincoming(conn, data) --luacheck: ignore 212/conn local ok, err = client.stream:feed(data); if not ok then client.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300)); client:close("not-well-formed"); end end function listeners.ondisconnect(conn, err) --luacheck: ignore 212/conn client.log("info", "Admin client disconnected: %s", err or "connection closed"); client.conn = nil; client.events.fire_event("disconnected"); end function listeners.onreadtimeout(conn) conn:send(" "); end client.listeners = listeners; return client; end return { connection = new_connection; server = new_server; client = new_client; };