File

util/dbuffer.lua @ 11571:a8f0f87e115a

prosody: Close the state on exit (ie garbage-collect everything) This ensures __gc is called on everything that may need it, such as database connections. It was reported in the chat by Happy that SQLite3 does not close its state cleanly in WAL mode, leaving the WAL file behind. This is probably rather a bug in mod_storage_sql, but forcing a final GC sweep should also help with such things everywhere.
author Kim Alvefur <zash@zash.se>
date Tue, 18 May 2021 20:08:37 +0200 (2021-05-18)
parent 11196:f243836c449a
child 11636:11e0a0a08da3
line wrap: on
line source
local queue = require "util.queue";

local s_byte, s_sub = string.byte, string.sub;
local dbuffer_methods = {};
local dynamic_buffer_mt = { __name = "dbuffer", __index = dbuffer_methods };

function dbuffer_methods:write(data)
	if self.max_size and #data + self._length > self.max_size then
		return nil;
	end
	local ok = self.items:push(data);
	if not ok then
		self:collapse();
		ok = self.items:push(data);
	end
	if not ok then
		return nil;
	end
	self._length = self._length + #data;
	return true;
end

function dbuffer_methods:read_chunk(requested_bytes)
	local chunk, consumed = self.items:peek(), self.front_consumed;
	if not chunk then return; end
	local chunk_length = #chunk;
	local remaining_chunk_length = chunk_length - consumed;
	if not requested_bytes then
		requested_bytes = remaining_chunk_length;
	end
	if remaining_chunk_length <= requested_bytes then
		self.front_consumed = 0;
		self._length = self._length - remaining_chunk_length;
		self.items:pop();
		assert(#chunk:sub(consumed + 1, -1) == remaining_chunk_length);
		return chunk:sub(consumed + 1, -1), remaining_chunk_length;
	end
	local end_pos = consumed + requested_bytes;
	self.front_consumed = end_pos;
	self._length = self._length - requested_bytes;
	assert(#chunk:sub(consumed + 1, end_pos) == requested_bytes);
	return chunk:sub(consumed + 1, end_pos), requested_bytes;
end

function dbuffer_methods:read(requested_bytes)
	local chunks;

	if requested_bytes and requested_bytes > self._length then
		return nil;
	end

	local chunk, read_bytes = self:read_chunk(requested_bytes);
	if not requested_bytes then
		return chunk;
	elseif chunk then
		requested_bytes = requested_bytes - read_bytes;
		if requested_bytes == 0 then -- Already read everything we need
			return chunk;
		end
		chunks = {};
	else
		return nil;
	end

	-- Need to keep reading more chunks
	while chunk do
		table.insert(chunks, chunk);
		if requested_bytes > 0 then
			chunk, read_bytes = self:read_chunk(requested_bytes);
			requested_bytes = requested_bytes - read_bytes;
		else
			break;
		end
	end

	return table.concat(chunks);
end

function dbuffer_methods:discard(requested_bytes)
	if requested_bytes > self._length then
		return nil;
	end

	local chunk, read_bytes = self:read_chunk(requested_bytes);
	if chunk then
		requested_bytes = requested_bytes - read_bytes;
		if requested_bytes == 0 then -- Already read everything we need
			return true;
		end
	else
		return nil;
	end

	while chunk do
		if requested_bytes > 0 then
			chunk, read_bytes = self:read_chunk(requested_bytes);
			requested_bytes = requested_bytes - read_bytes;
		else
			break;
		end
	end
	return true;
end

-- Normalize i, j into absolute offsets within the
-- front chunk (accounting for front_consumed), and
-- ensure there is enough data in the first chunk
-- to cover any subsequent :sub() or :byte() operation
function dbuffer_methods:_prep_sub(i, j)
	if j == nil then
		j = -1;
	end
	if j < 0 then
		j = self._length + (j+1);
	end
	if i < 0 then
		i = self._length + (i+1);
	end
	if i < 1 then
		i = 1;
	end
	if j > self._length then
		j = self._length;
	end
	if i > j then
		return nil;
	end

	self:collapse(j);

	if self.front_consumed > 0 then
		i = i + self.front_consumed;
		j = j + self.front_consumed;
	end

	return i, j;
end

function dbuffer_methods:sub(i, j)
	i, j = self:_prep_sub(i, j);
	if not i then
		return "";
	end
	return s_sub(self.items:peek(), i, j);
end

function dbuffer_methods:byte(i, j)
	i = i or 1;
	j = j or i;
	i, j = self:_prep_sub(i, j);
	if not i then
		return;
	end
	return s_byte(self.items:peek(), i, j);
end

function dbuffer_methods:length()
	return self._length;
end
dbuffer_methods.len = dbuffer_methods.length; -- strings have :len()
dynamic_buffer_mt.__len = dbuffer_methods.length; -- support # operator

function dbuffer_methods:collapse(bytes)
	bytes = bytes or self._length;

	local front_chunk = self.items:peek();

	if not front_chunk or #front_chunk - self.front_consumed >= bytes then
		return;
	end

	local front_chunks = { front_chunk:sub(self.front_consumed+1) };
	local front_bytes = #front_chunks[1];

	while front_bytes < bytes do
		self.items:pop();
		local chunk = self.items:peek();
		front_bytes = front_bytes + #chunk;
		table.insert(front_chunks, chunk);
	end
	self.items:replace(table.concat(front_chunks));
	self.front_consumed = 0;
end

local function new(max_size, max_chunks)
	if max_size and max_size <= 0 then
		return nil;
	end
	return setmetatable({
		front_consumed = 0;
		_length = 0;
		max_size = max_size;
		items = queue.new(max_chunks or 32);
	}, dynamic_buffer_mt);
end

return {
	new = new;
};