File

plugins/mod_limits.lua @ 11906:ba3344926e18

MUC: Add option to include form in registration query This was originally not done based on my interpretation of XEP-0045. Today's reading, however, revealed that it actually says the result > SHOULD contain **at least** a <username/> element (emphasis mine) I take this to mean that including a form **is** allowed (and I think this is sensible). Tigase already includes the form I believe. I've gated the new behaviour behind a (default off) option, because it hasn't been tested for compatibility with clients. My primary desire for it is in Snikket, where the clients will be tested to ensure compatibility with this. I don't anticipate that (m)any clients would break, so maybe after 0.12 we can experiment with enabling it by default and eventually remove the option.
author Matthew Wild <mwild1@gmail.com>
date Mon, 15 Nov 2021 16:11:03 +0000
parent 11735:7d29167bfcc3
child 12977:74b9e05af71e
line wrap: on
line source

-- Because we deal with pre-authed sessions and streams we can't be host-specific
module:set_global();

local filters = require "util.filters";
local throttle = require "util.throttle";
local timer = require "util.timer";
local ceil = math.ceil;

local limits_cfg = module:get_option("limits", {});
local limits_resolution = module:get_option_number("limits_resolution", 1);

local default_bytes_per_second = 3000;
local default_burst = 2;

local rate_units = { b = 1, k = 3, m = 6, g = 9, t = 12 } -- Plan for the future.
local function parse_rate(rate, sess_type)
	local quantity, unit, exp;
	if rate then
		quantity, unit = rate:match("^(%d+) ?([^/]+)/s$");
		exp = quantity and rate_units[unit:sub(1,1):lower()];
	end
	if not exp then
		module:log("error", "Error parsing rate for %s: %q, using default rate (%d bytes/s)", sess_type, rate, default_bytes_per_second);
		return default_bytes_per_second;
	end
	return quantity*(10^exp);
end

local function parse_burst(burst, sess_type)
	if type(burst) == "string" then
		burst = burst:match("^(%d+) ?s$");
	end
	local n_burst = tonumber(burst);
	if burst and not n_burst then
		module:log("error", "Unable to parse burst for %s: %q, using default burst interval (%ds)", sess_type, burst, default_burst);
	end
	return n_burst or default_burst;
end

-- Process config option into limits table:
-- limits = { c2s = { bytes_per_second = X, burst_seconds = Y } }
local limits = {
	c2s = {
		bytes_per_second = 10 * 1024;
		burst_seconds = 2;
	};
	s2sin = {
		bytes_per_second = 30 * 1024;
		burst_seconds = 2;
	};
};

for sess_type, sess_limits in pairs(limits_cfg) do
	limits[sess_type] = {
		bytes_per_second = parse_rate(sess_limits.rate, sess_type);
		burst_seconds = parse_burst(sess_limits.burst, sess_type);
	};
end

local default_filter_set = {};

function default_filter_set.bytes_in(bytes, session)
	local sess_throttle = session.throttle;
	if sess_throttle then
		local ok, _, outstanding = sess_throttle:poll(#bytes, true);
		if not ok then
			session.log("debug", "Session over rate limit (%d) with %d (by %d), pausing", sess_throttle.max, #bytes, outstanding);
			outstanding = ceil(outstanding);
			session.conn:pause(); -- Read no more data from the connection until there is no outstanding data
			local outstanding_data = bytes:sub(-outstanding);
			bytes = bytes:sub(1, #bytes-outstanding);
			timer.add_task(limits_resolution, function ()
				if not session.conn then return; end
				if sess_throttle:peek(#outstanding_data) then
					session.log("debug", "Resuming paused session");
					session.conn:resume();
				end
				-- Handle what we can of the outstanding data
				session.data(outstanding_data);
			end);
		end
	end
	return bytes;
end

local type_filters = {
	c2s = default_filter_set;
	s2sin = default_filter_set;
	s2sout = default_filter_set;
};

local function filter_hook(session)
	local session_type = session.type:match("^[^_]+");
	local filter_set, opts = type_filters[session_type], limits[session_type];
	if opts then
		if session.conn and session.conn.setlimit then
			session.conn:setlimit(opts.bytes_per_second);
			-- Currently no burst support
		else
			session.throttle = throttle.create(opts.bytes_per_second * opts.burst_seconds, opts.burst_seconds);
			filters.add_filter(session, "bytes/in", filter_set.bytes_in, 1000);
		end
	end
end

function module.load()
	filters.add_filter_hook(filter_hook);
end

function module.unload()
	filters.remove_filter_hook(filter_hook);
end

function unlimited(session)
	local session_type = session.type:match("^[^_]+");
	if session.conn and session.conn.setlimit then
		session.conn:setlimit(0);
		-- Currently no burst support
	else
		local filter_set = type_filters[session_type];
		filters.remove_filter(session, "bytes/in", filter_set.bytes_in);
		session.throttle = nil;
	end
end

function module.add_host(module)
	local unlimited_jids = module:get_option_inherited_set("unlimited_jids", {});

	if not unlimited_jids:empty() then
		module:hook("authentication-success", function (event)
			local session = event.session;
			local jid = session.username .. "@" .. session.host;
			if unlimited_jids:contains(jid) then
				unlimited(session);
			end
		end);

		module:hook("s2sout-established", function (event)
			local session = event.session;
			if unlimited_jids:contains(session.to_host) then
				unlimited(session);
			end
		end);

		module:hook("s2sin-established", function (event)
			local session = event.session;
			if session.from_host and unlimited_jids:contains(session.from_host) then
				unlimited(session);
			end
		end);

	end
end