File

util/dbuffer.lua @ 13149:0aaf67f70015

mod_storage_sql: Add setting to tune SQLite3 performance vs safety Notably the default journal_mode of DELETE is somewhat slow, some users might want to catch up to the amazing performance of internal storage.
author Kim Alvefur <zash@zash.se>
date Sun, 11 Jun 2023 17:04:11 +0200
parent 12975:d10957394a3c
line wrap: on
line source

local queue = require "prosody.util.queue";

local s_byte, s_sub = string.byte, string.sub;
local dbuffer_methods = {};
local dynamic_buffer_mt = { __name = "dbuffer", __index = dbuffer_methods };

function dbuffer_methods:write(data)
	if self.max_size and #data + self._length > self.max_size then
		return nil;
	end
	local ok = self.items:push(data);
	if not ok then
		self:collapse();
		ok = self.items:push(data);
	end
	if not ok then
		return nil;
	end
	self._length = self._length + #data;
	return true;
end

function dbuffer_methods:read_chunk(requested_bytes)
	local chunk, consumed = self.items:peek(), self.front_consumed;
	if not chunk then return; end
	local chunk_length = #chunk;
	local remaining_chunk_length = chunk_length - consumed;
	if not requested_bytes then
		requested_bytes = remaining_chunk_length;
	end
	if remaining_chunk_length <= requested_bytes then
		self.front_consumed = 0;
		self._length = self._length - remaining_chunk_length;
		self.items:pop();
		assert(#chunk:sub(consumed + 1, -1) == remaining_chunk_length);
		return chunk:sub(consumed + 1, -1), remaining_chunk_length;
	end
	local end_pos = consumed + requested_bytes;
	self.front_consumed = end_pos;
	self._length = self._length - requested_bytes;
	assert(#chunk:sub(consumed + 1, end_pos) == requested_bytes);
	return chunk:sub(consumed + 1, end_pos), requested_bytes;
end

function dbuffer_methods:read(requested_bytes)
	local chunks;

	if requested_bytes and requested_bytes > self._length then
		return nil;
	end

	local chunk, read_bytes = self:read_chunk(requested_bytes);
	if not requested_bytes then
		return chunk;
	elseif chunk then
		requested_bytes = requested_bytes - read_bytes;
		if requested_bytes == 0 then -- Already read everything we need
			return chunk;
		end
		chunks = {};
	else
		return nil;
	end

	-- Need to keep reading more chunks
	while chunk do
		table.insert(chunks, chunk);
		if requested_bytes > 0 then
			chunk, read_bytes = self:read_chunk(requested_bytes);
			requested_bytes = requested_bytes - read_bytes;
		else
			break;
		end
	end

	return table.concat(chunks);
end

-- Read to, and including, the specified character sequence (return nil if not found)
function dbuffer_methods:read_until(char)
	local buffer_pos = 0;
	for i, chunk in self.items:items() do
		local start = 1 + ((i == 1) and self.front_consumed or 0);
		local char_pos = chunk:find(char, start, true);
		if char_pos then
			return self:read(1 + buffer_pos + char_pos - start);
		end
		buffer_pos = buffer_pos + #chunk - (start - 1);
	end
	return nil;
end

function dbuffer_methods:discard(requested_bytes)
	if self._length == 0 then return true; end
	if not requested_bytes or requested_bytes >= self._length then
		self.front_consumed = 0;
		self._length = 0;
		for _ in self.items:consume() do end
		return true;
	end

	local chunk, read_bytes = self:read_chunk(requested_bytes);
	requested_bytes = requested_bytes - read_bytes;
	if requested_bytes == 0 then -- Already read everything we need
		return true;
	end

	while chunk do
		if requested_bytes > 0 then
			chunk, read_bytes = self:read_chunk(requested_bytes);
			requested_bytes = requested_bytes - read_bytes;
		else
			break;
		end
	end
	return true;
end

-- Normalize i, j into absolute offsets within the
-- front chunk (accounting for front_consumed), and
-- ensure there is enough data in the first chunk
-- to cover any subsequent :sub() or :byte() operation
function dbuffer_methods:_prep_sub(i, j)
	if j == nil then
		j = -1;
	end
	if j < 0 then
		j = self._length + (j+1);
	end
	if i < 0 then
		i = self._length + (i+1);
	end
	if i < 1 then
		i = 1;
	end
	if j > self._length then
		j = self._length;
	end
	if i > j then
		return nil;
	end

	self:collapse(j);

	if self.front_consumed > 0 then
		i = i + self.front_consumed;
		j = j + self.front_consumed;
	end

	return i, j;
end

function dbuffer_methods:sub(i, j)
	i, j = self:_prep_sub(i, j);
	if not i then
		return "";
	end
	return s_sub(self.items:peek(), i, j);
end

function dbuffer_methods:byte(i, j)
	i = i or 1;
	j = j or i;
	i, j = self:_prep_sub(i, j);
	if not i then
		return;
	end
	return s_byte(self.items:peek(), i, j);
end

function dbuffer_methods:length()
	return self._length;
end
dbuffer_methods.len = dbuffer_methods.length; -- strings have :len()
dynamic_buffer_mt.__len = dbuffer_methods.length; -- support # operator

function dbuffer_methods:collapse(bytes)
	bytes = bytes or self._length;

	local front_chunk = self.items:peek();

	if not front_chunk or #front_chunk - self.front_consumed >= bytes then
		return;
	end

	local front_chunks = { front_chunk:sub(self.front_consumed+1) };
	local front_bytes = #front_chunks[1];

	while front_bytes < bytes do
		self.items:pop();
		local chunk = self.items:peek();
		front_bytes = front_bytes + #chunk;
		table.insert(front_chunks, chunk);
	end
	self.items:replace(table.concat(front_chunks));
	self.front_consumed = 0;
end

local function new(max_size, max_chunks)
	if max_size and max_size <= 0 then
		return nil;
	end
	return setmetatable({
		front_consumed = 0;
		_length = 0;
		max_size = max_size;
		items = queue.new(max_chunks or 32);
	}, dynamic_buffer_mt);
end

return {
	new = new;
};