Software /
code /
prosody
File
plugins/mod_limits.lua @ 12894:0598d822614f 0.12 0.12.3
mod_websocket: Fire pre-session-close event (fixes #1800)
This event was added in a7c183bb4e64 and is required to make mod_smacks know
that a session was intentionally closed and shouldn't be hibernated (see
fcea4d9e7502).
Because this was missing from mod_websocket's session.close(), mod_smacks
would always attempt to hibernate websocket sessions even if they closed
cleanly.
That mod_websocket has its own copy of session.close() is something to fix
another day (probably not in the stable branch). So for now this commit makes
the minimal change to get things working again.
Thanks to Damian and the Jitsi team for reporting.
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Mon, 20 Feb 2023 18:10:15 +0000 |
parent | 11735:7d29167bfcc3 |
child | 12977:74b9e05af71e |
line wrap: on
line source
-- Because we deal with pre-authed sessions and streams we can't be host-specific module:set_global(); local filters = require "util.filters"; local throttle = require "util.throttle"; local timer = require "util.timer"; local ceil = math.ceil; local limits_cfg = module:get_option("limits", {}); local limits_resolution = module:get_option_number("limits_resolution", 1); local default_bytes_per_second = 3000; local default_burst = 2; local rate_units = { b = 1, k = 3, m = 6, g = 9, t = 12 } -- Plan for the future. local function parse_rate(rate, sess_type) local quantity, unit, exp; if rate then quantity, unit = rate:match("^(%d+) ?([^/]+)/s$"); exp = quantity and rate_units[unit:sub(1,1):lower()]; end if not exp then module:log("error", "Error parsing rate for %s: %q, using default rate (%d bytes/s)", sess_type, rate, default_bytes_per_second); return default_bytes_per_second; end return quantity*(10^exp); end local function parse_burst(burst, sess_type) if type(burst) == "string" then burst = burst:match("^(%d+) ?s$"); end local n_burst = tonumber(burst); if burst and not n_burst then module:log("error", "Unable to parse burst for %s: %q, using default burst interval (%ds)", sess_type, burst, default_burst); end return n_burst or default_burst; end -- Process config option into limits table: -- limits = { c2s = { bytes_per_second = X, burst_seconds = Y } } local limits = { c2s = { bytes_per_second = 10 * 1024; burst_seconds = 2; }; s2sin = { bytes_per_second = 30 * 1024; burst_seconds = 2; }; }; for sess_type, sess_limits in pairs(limits_cfg) do limits[sess_type] = { bytes_per_second = parse_rate(sess_limits.rate, sess_type); burst_seconds = parse_burst(sess_limits.burst, sess_type); }; end local default_filter_set = {}; function default_filter_set.bytes_in(bytes, session) local sess_throttle = session.throttle; if sess_throttle then local ok, _, outstanding = sess_throttle:poll(#bytes, true); if not ok then session.log("debug", "Session over rate limit (%d) with %d (by %d), pausing", sess_throttle.max, #bytes, outstanding); outstanding = ceil(outstanding); session.conn:pause(); -- Read no more data from the connection until there is no outstanding data local outstanding_data = bytes:sub(-outstanding); bytes = bytes:sub(1, #bytes-outstanding); timer.add_task(limits_resolution, function () if not session.conn then return; end if sess_throttle:peek(#outstanding_data) then session.log("debug", "Resuming paused session"); session.conn:resume(); end -- Handle what we can of the outstanding data session.data(outstanding_data); end); end end return bytes; end local type_filters = { c2s = default_filter_set; s2sin = default_filter_set; s2sout = default_filter_set; }; local function filter_hook(session) local session_type = session.type:match("^[^_]+"); local filter_set, opts = type_filters[session_type], limits[session_type]; if opts then if session.conn and session.conn.setlimit then session.conn:setlimit(opts.bytes_per_second); -- Currently no burst support else session.throttle = throttle.create(opts.bytes_per_second * opts.burst_seconds, opts.burst_seconds); filters.add_filter(session, "bytes/in", filter_set.bytes_in, 1000); end end end function module.load() filters.add_filter_hook(filter_hook); end function module.unload() filters.remove_filter_hook(filter_hook); end function unlimited(session) local session_type = session.type:match("^[^_]+"); if session.conn and session.conn.setlimit then session.conn:setlimit(0); -- Currently no burst support else local filter_set = type_filters[session_type]; filters.remove_filter(session, "bytes/in", filter_set.bytes_in); session.throttle = nil; end end function module.add_host(module) local unlimited_jids = module:get_option_inherited_set("unlimited_jids", {}); if not unlimited_jids:empty() then module:hook("authentication-success", function (event) local session = event.session; local jid = session.username .. "@" .. session.host; if unlimited_jids:contains(jid) then unlimited(session); end end); module:hook("s2sout-established", function (event) local session = event.session; if unlimited_jids:contains(session.to_host) then unlimited(session); end end); module:hook("s2sin-established", function (event) local session = event.session; if session.from_host and unlimited_jids:contains(session.from_host) then unlimited(session); end end); end end