File

plugins/mod_limits.lua @ 11704:0a8671f32424

mod_s2s: Guard against missing 'to' on incoming stream Given an incoming <stream:stream from="example.com"> this line would have mistakenly reported the 'from' as the local host. Neither are technically required and may be missing, especially on connections used only for Dialback. Outgoing connections initiated by Prosody always have 'from_host' and 'to_host', so it is safer to check it this way.
author Kim Alvefur <zash@zash.se>
date Sun, 18 Jul 2021 09:08:04 +0200
parent 11560:3bbb1af92514
child 11734:c0fc4ca74046
line wrap: on
line source

-- Because we deal with pre-authed sessions and streams we can't be host-specific
module:set_global();

local filters = require "util.filters";
local throttle = require "util.throttle";
local timer = require "util.timer";
local ceil = math.ceil;

local limits_cfg = module:get_option("limits", {});
local limits_resolution = module:get_option_number("limits_resolution", 1);

local default_bytes_per_second = 3000;
local default_burst = 2;

local rate_units = { b = 1, k = 3, m = 6, g = 9, t = 12 } -- Plan for the future.
local function parse_rate(rate, sess_type)
	local quantity, unit, exp;
	if rate then
		quantity, unit = rate:match("^(%d+) ?([^/]+)/s$");
		exp = quantity and rate_units[unit:sub(1,1):lower()];
	end
	if not exp then
		module:log("error", "Error parsing rate for %s: %q, using default rate (%d bytes/s)", sess_type, rate, default_bytes_per_second);
		return default_bytes_per_second;
	end
	return quantity*(10^exp);
end

local function parse_burst(burst, sess_type)
	if type(burst) == "string" then
		burst = burst:match("^(%d+) ?s$");
	end
	local n_burst = tonumber(burst);
	if burst and not n_burst then
		module:log("error", "Unable to parse burst for %s: %q, using default burst interval (%ds)", sess_type, burst, default_burst);
	end
	return n_burst or default_burst;
end

-- Process config option into limits table:
-- limits = { c2s = { bytes_per_second = X, burst_seconds = Y } }
local limits = {
	c2s = {
		bytes_per_second = 10 * 1024;
		burst_seconds = 2;
	};
	s2sin = {
		bytes_per_second = 30 * 1024;
		burst_seconds = 2;
	};
};

for sess_type, sess_limits in pairs(limits_cfg) do
	limits[sess_type] = {
		bytes_per_second = parse_rate(sess_limits.rate, sess_type);
		burst_seconds = parse_burst(sess_limits.burst, sess_type);
	};
end

local default_filter_set = {};

function default_filter_set.bytes_in(bytes, session)
	local sess_throttle = session.throttle;
	if sess_throttle then
		local ok, _, outstanding = sess_throttle:poll(#bytes, true);
		if not ok then
			session.log("debug", "Session over rate limit (%d) with %d (by %d), pausing", sess_throttle.max, #bytes, outstanding);
			outstanding = ceil(outstanding);
			session.conn:pause(); -- Read no more data from the connection until there is no outstanding data
			local outstanding_data = bytes:sub(-outstanding);
			bytes = bytes:sub(1, #bytes-outstanding);
			timer.add_task(limits_resolution, function ()
				if not session.conn then return; end
				if sess_throttle:peek(#outstanding_data) then
					session.log("debug", "Resuming paused session");
					session.conn:resume();
				end
				-- Handle what we can of the outstanding data
				session.data(outstanding_data);
			end);
		end
	end
	return bytes;
end

local type_filters = {
	c2s = default_filter_set;
	s2sin = default_filter_set;
	s2sout = default_filter_set;
};

local function filter_hook(session)
	local session_type = session.type:match("^[^_]+");
	local filter_set, opts = type_filters[session_type], limits[session_type];
	if opts then
		if session.conn and session.conn.setlimit then
			session.conn:setlimit(opts.bytes_per_second);
			-- Currently no burst support
		else
			session.throttle = throttle.create(opts.bytes_per_second * opts.burst_seconds, opts.burst_seconds);
			filters.add_filter(session, "bytes/in", filter_set.bytes_in, 1000);
		end
	end
end

function module.load()
	filters.add_filter_hook(filter_hook);
end

function module.unload()
	filters.remove_filter_hook(filter_hook);
end

function module.add_host(module)
	local unlimited_jids = module:get_option_inherited_set("unlimited_jids", {});

	if not unlimited_jids:empty() then
		module:hook("authentication-success", function (event)
			local session = event.session;
			local session_type = session.type:match("^[^_]+");
			local jid = session.username .. "@" .. session.host;
			if unlimited_jids:contains(jid) then
				if session.conn and session.conn.setlimit then
					session.conn:setlimit(0);
					-- Currently no burst support
				else
					local filter_set = type_filters[session_type];
					filters.remove_filter(session, "bytes/in", filter_set.bytes_in);
					session.throttle = nil;
				end
			end
		end);
	end
end