Software /
code /
prosody
File
plugins/mod_websocket.lua @ 13652:a08065207ef0
net.server_epoll: Call :shutdown() on TLS sockets when supported
Comment from Matthew:
This fixes a potential issue where the Prosody process gets blocked on sockets
waiting for them to close. Unlike non-TLS sockets, closing a TLS socket sends
layer 7 data, and this can cause problems for sockets which are in the process
of being cleaned up.
This depends on LuaSec changes which are not yet upstream.
From Martijn's original email:
So first my analysis of luasec. in ssl.c the socket is put into blocking
mode right before calling SSL_shutdown() inside meth_destroy(). My best
guess to why this is is because meth_destroy is linked to the __close
and __gc methods, which can't exactly be called multiple times and
luasec does want to make sure that a tls session is shutdown as clean
as possible.
I can't say I disagree with this reasoning and don't want to change this
behaviour. My solution to this without changing the current behaviour is
to introduce a shutdown() method. I am aware that this overlaps in a
conflicting way with tcp's shutdown method, but it stays close to the
OpenSSL name. This method calls SSL_shutdown() in the current
(non)blocking mode of the underlying socket and returns a boolean
whether or not the shutdown is completed (matching SSL_shutdown()'s 0
or 1 return values), and returns the familiar ssl_ioerror() strings on
error with a false for completion. This error can then be used to
determine if we have wantread/wantwrite to finalize things. Once
meth_shutdown() has been called once a shutdown flag will be set, which
indicates to meth_destroy() that the SSL_shutdown() has been handled
by the application and it shouldn't be needed to set the socket to
blocking mode. I've left the SSL_shutdown() call in the
LSEC_STATE_CONNECTED to prevent TOCTOU if the application reaches a
timeout for the shutdown code, which might allow SSL_shutdown() to
clean up anyway at the last possible moment.
Another thing I've changed to luasec is the call to socket_setblocking()
right before calling close(2) in socket_destroy() in usocket.c.
According to the latest POSIX[0]:
Note that the requirement for close() on a socket to block for up to
the current linger interval is not conditional on the O_NONBLOCK
setting.
Which I read to mean that removing O_NONBLOCK on the socket before close
doesn't impact the behaviour and only causes noise in system call
tracers. I didn't touch the windows bits of this, since I don't do
windows.
For the prosody side of things I've made the TLS shutdown bits resemble
interface:onwritable(), and put it under a combined guard of self._tls
and self.conn.shutdown. The self._tls bit is there to prevent getting
stuck on this condition, and self.conn.shutdown is there to prevent the
code being called by instances where the patched luasec isn't deployed.
The destroy() method can be called from various places and is read by
me as the "we give up" error path. To accommodate for these unexpected
entrypoints I've added a single call to self.conn:shutdown() to prevent
the socket being put into blocking mode. I have no expectations that
there is any other use here. Same as previous, the self.conn.shutdown
check is there to make sure it's not called on unpatched luasec
deployments and self._tls is there to make sure we don't call shutdown()
on tcp sockets.
I wouldn't recommend logging of the conn:shutdown() error inside
close(), since a lot of clients simply close the connection before
SSL_shutdown() is done.
author | Martijn van Duren <martijn@openbsd.org> |
---|---|
date | Thu, 06 Feb 2025 15:04:38 +0000 |
parent | 13213:50324f66ca2a |
child | 13720:c3c4281c1339 |
line wrap: on
line source
-- Prosody IM -- Copyright (C) 2012-2014 Florian Zeitz -- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- -- luacheck: ignore 431/log module:set_global(); local add_task = require "prosody.util.timer".add_task; local add_filter = require "prosody.util.filters".add_filter; local sha1 = require "prosody.util.hashes".sha1; local base64 = require "prosody.util.encodings".base64.encode; local st = require "prosody.util.stanza"; local parse_xml = require "prosody.util.xml".parse; local contains_token = require "prosody.util.http".contains_token; local portmanager = require "prosody.core.portmanager"; local sm_destroy_session = require"prosody.core.sessionmanager".destroy_session; local log = module._log; local dbuffer = require "prosody.util.dbuffer"; local websocket_frames = require"prosody.net.websocket.frames"; local parse_frame = websocket_frames.parse; local build_frame = websocket_frames.build; local build_close = websocket_frames.build_close; local parse_close = websocket_frames.parse_close; local t_concat = table.concat; local stanza_size_limit = module:get_option_integer("c2s_stanza_size_limit", 1024 * 256, 10000); local frame_buffer_limit = module:get_option_integer("websocket_frame_buffer_limit", 2 * stanza_size_limit, 0); local frame_fragment_limit = module:get_option_integer("websocket_frame_fragment_limit", 8, 0); local stream_close_timeout = module:get_option_period("c2s_close_timeout", 5); local consider_websocket_secure = module:get_option_boolean("consider_websocket_secure"); local cross_domain = module:get_option("cross_domain_websocket"); if cross_domain ~= nil then module:log("info", "The 'cross_domain_websocket' option has been deprecated"); end local xmlns_framing = "urn:ietf:params:xml:ns:xmpp-framing"; local xmlns_streams = "http://etherx.jabber.org/streams"; local xmlns_client = "jabber:client"; local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; module:depends("c2s") local sessions = module:shared("c2s/sessions"); local c2s_listener = portmanager.get_service("c2s").listener; --- Session methods local function session_open_stream(session, from, to) local attr = { xmlns = xmlns_framing, ["xml:lang"] = "en", version = "1.0", id = session.streamid or "", from = from or session.host, to = to, }; if session.stream_attrs then session:stream_attrs(from, to, attr) end session.send(st.stanza("open", attr)); end local function session_close(session, reason) local log = session.log or log; local close_event_payload = { session = session, reason = reason }; module:context(session.host):fire_event("pre-session-close", close_event_payload); reason = close_event_payload.reason; if session.conn then if session.notopen then session:open_stream(); end if reason then -- nil == no err, initiated by us, false == initiated by client local stream_error = st.stanza("stream:error"); if type(reason) == "string" then -- assume stream error stream_error:tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }); elseif st.is_stanza(reason) then stream_error = reason; elseif type(reason) == "table" then if reason.condition then stream_error:tag(reason.condition, stream_xmlns_attr):up(); if reason.text then stream_error:tag("text", stream_xmlns_attr):text(reason.text):up(); end if reason.extra then stream_error:add_child(reason.extra); end end end log("debug", "Disconnecting client, <stream:error> is: %s", stream_error); session.send(stream_error); end session.send(st.stanza("close", { xmlns = xmlns_framing })); function session.send() return false; end -- luacheck: ignore 422/reason -- FIXME reason should be handled in common place local reason = (reason and (reason.name or reason.text or reason.condition)) or reason; session.log("debug", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed"); -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote local conn = session.conn; if reason == nil and not session.notopen and session.type == "c2s" then -- Grace time to process data from authenticated cleanly-closed stream add_task(stream_close_timeout, function () if not session.destroyed then session.log("warn", "Failed to receive a stream close response, closing connection anyway..."); sm_destroy_session(session, reason); conn:write(build_close(1000, "Stream closed")); conn:close(); end end); else sm_destroy_session(session, reason); conn:write(build_close(1000, "Stream closed")); conn:close(); end end end --- Filters local function filter_open_close(data) if not data:find(xmlns_framing, 1, true) then return data; end local oc = parse_xml(data); if not oc then return data; end if oc.attr.xmlns ~= xmlns_framing then return data; end if oc.name == "close" then return "</stream:stream>"; end if oc.name == "open" then oc.name = "stream:stream"; oc.attr.xmlns = nil; oc.attr["xmlns:stream"] = xmlns_streams; return oc:top_tag(); end return data; end local default_get_response_text = "It works! Now point your WebSocket client to this URL to connect to Prosody." local websocket_get_response_text = module:get_option_string("websocket_get_response_text", default_get_response_text) local default_get_response_body = [[<!DOCTYPE html><html><head><title>Websocket</title></head><body> <p>]]..websocket_get_response_text..[[</p> </body></html>]] local websocket_get_response_body = module:get_option_string("websocket_get_response_body", default_get_response_body) local function validate_frame(frame, max_length) local opcode, length = frame.opcode, frame.length; if max_length and length > max_length then return false, 1009, "Payload too large"; end -- Error cases if frame.RSV1 or frame.RSV2 or frame.RSV3 then -- Reserved bits non zero return false, 1002, "Reserved bits not zero"; end if opcode == 0x8 and frame.data then -- close frame if length == 1 then return false, 1002, "Close frame with payload, but too short for status code"; elseif length >= 2 then local status_code = parse_close(frame.data) if status_code < 1000 then return false, 1002, "Closed with invalid status code"; elseif ((status_code > 1003 and status_code < 1007) or status_code > 1011) and status_code < 3000 then return false, 1002, "Closed with reserved status code"; end end end if opcode >= 0x8 then if length > 125 then -- Control frame with too much payload return false, 1002, "Payload too large"; end if not frame.FIN then -- Fragmented control frame return false, 1002, "Fragmented control frame"; end end if (opcode > 0x2 and opcode < 0x8) or (opcode > 0xA) then return false, 1002, "Reserved opcode"; end -- Check opcode if opcode == 0x2 then -- Binary frame return false, 1003, "Only text frames are supported, RFC 7395 3.2"; elseif opcode == 0x8 then -- Close request return false, 1000, "Goodbye"; end -- Other (XMPP-specific) validity checks if not frame.FIN then return false, 1003, "Continuation frames are not supported, RFC 7395 3.3.3"; end if opcode == 0x01 and frame.data and frame.data:byte(1, 1) ~= 60 then return false, 1007, "Invalid payload start character, RFC 7395 3.3.3"; end return true; end function handle_request(event) local request, response = event.request, event.response; local conn = response.conn; conn.starttls = false; -- Prevent mod_tls from believing starttls can be done if not request.headers.sec_websocket_key or request.method ~= "GET" then return module:fire_event("http-message", { response = event.response; --- title = "Prosody WebSocket endpoint"; message = websocket_get_response_text; warning = not (consider_websocket_secure or request.secure) and "This endpoint is not considered secure!" or nil; }) or websocket_get_response_body; end local wants_xmpp = contains_token(request.headers.sec_websocket_protocol or "", "xmpp"); if not wants_xmpp then module:log("debug", "Client didn't want to talk XMPP, list of protocols was %s", request.headers.sec_websocket_protocol or "(empty)"); return 501; end local function websocket_close(code, message) conn:write(build_close(code, message)); conn:close(); end local function websocket_handle_error(session, code, message) if code == 1009 then -- stanza size limit exceeded -- we close the session, rather than the connection, -- otherwise a resuming client will simply resend the -- offending stanza session:close({ condition = "policy-violation", text = "stanza too large" }); else websocket_close(code, message); end end local function handle_frame(frame) module:log("debug", "Websocket received frame: opcode=%0x, %i bytes", frame.opcode, #frame.data); -- Check frame makes sense local frame_ok, err_status, err_text = validate_frame(frame, stanza_size_limit); if not frame_ok then return frame_ok, err_status, err_text; end local opcode = frame.opcode; if opcode == 0x9 then -- Ping frame frame.opcode = 0xA; frame.MASK = false; -- Clients send masked frames, servers don't, see #1484 conn:write(build_frame(frame)); return ""; elseif opcode == 0xA then -- Pong frame, MAY be sent unsolicited, eg as keepalive return ""; elseif opcode ~= 0x1 then -- Not text frame (which is all we support) log("warn", "Received frame with unsupported opcode %i", opcode); return ""; end return frame.data; end conn:setlistener(c2s_listener); c2s_listener.onconnect(conn); local session = sessions[conn]; -- Use upstream IP if a HTTP proxy was used -- See mod_http and #540 session.ip = request.ip; session.secure = consider_websocket_secure or request.secure or session.secure; session.websocket_request = request; session.open_stream = session_open_stream; session.close = session_close; local frameBuffer = dbuffer.new(frame_buffer_limit, frame_fragment_limit); add_filter(session, "bytes/in", function(data) if not frameBuffer:write(data) then session.log("warn", "websocket frame buffer full - terminating session"); session:close({ condition = "resource-constraint", text = "frame buffer exceeded" }); return; end local cache = {}; local frame, length, partial = parse_frame(frameBuffer); while frame do frameBuffer:discard(length); local result, err_status, err_text = handle_frame(frame); if not result then websocket_handle_error(session, err_status, err_text); break; end cache[#cache+1] = filter_open_close(result); frame, length, partial = parse_frame(frameBuffer); end if partial then -- The header of the next frame is already in the buffer, run -- some early validation here local frame_ok, err_status, err_text = validate_frame(partial, stanza_size_limit); if not frame_ok then websocket_handle_error(session, err_status, err_text); end end return t_concat(cache, ""); end); add_filter(session, "stanzas/out", function(stanza) stanza = st.clone(stanza); local attr = stanza.attr; attr.xmlns = attr.xmlns or xmlns_client; if stanza.name:find("^stream:") then attr["xmlns:stream"] = attr["xmlns:stream"] or xmlns_streams; end return stanza; end, -1000); add_filter(session, "bytes/out", function(data) return build_frame({ FIN = true, opcode = 0x01, data = tostring(data)}); end); response.status_code = 101; response.headers.upgrade = "websocket"; response.headers.connection = "Upgrade"; response.headers.sec_webSocket_accept = base64(sha1(request.headers.sec_websocket_key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); response.headers.sec_webSocket_protocol = "xmpp"; module:fire_event("websocket-session", { session = session, request = request }); session.log("debug", "Sending WebSocket handshake"); return ""; end local function keepalive(event) local session = event.session; if session.open_stream == session_open_stream then return session.conn:write(build_frame({ opcode = 0x9, FIN = true })); end end function module.add_host(module) module:hook("c2s-read-timeout", keepalive, -0.9); module:depends("http"); module:provides("http", { name = "websocket"; default_path = "xmpp-websocket"; cors = { enabled = true; }; route = { ["GET"] = handle_request; ["GET /"] = handle_request; }; }); module:hook("c2s-read-timeout", keepalive, -0.9); end if require"prosody.core.modulemanager".get_modules_for_host("*"):contains(module.name) then module:add_host(); end