File

util/dbuffer.lua @ 11922:28f5c8061dad

net.server_epoll: Fix streaming downloads (thanks Menel) ff4e34c448a4 broke the way net.http.server streams downloads from disk because it made writes from the ondrain callback no longer reset the want-write flag, causing the download to halt. Writes from the predrain handler still must not trigger anything but additions to the buffer, since it is about to do all the socket writing already.
author Kim Alvefur <zash@zash.se>
date Fri, 19 Nov 2021 15:45:01 +0100
parent 11637:19cddf92fcc2
child 12762:79b89f382290
line wrap: on
line source

local queue = require "util.queue";

local s_byte, s_sub = string.byte, string.sub;
local dbuffer_methods = {};
local dynamic_buffer_mt = { __name = "dbuffer", __index = dbuffer_methods };

function dbuffer_methods:write(data)
	if self.max_size and #data + self._length > self.max_size then
		return nil;
	end
	local ok = self.items:push(data);
	if not ok then
		self:collapse();
		ok = self.items:push(data);
	end
	if not ok then
		return nil;
	end
	self._length = self._length + #data;
	return true;
end

function dbuffer_methods:read_chunk(requested_bytes)
	local chunk, consumed = self.items:peek(), self.front_consumed;
	if not chunk then return; end
	local chunk_length = #chunk;
	local remaining_chunk_length = chunk_length - consumed;
	if not requested_bytes then
		requested_bytes = remaining_chunk_length;
	end
	if remaining_chunk_length <= requested_bytes then
		self.front_consumed = 0;
		self._length = self._length - remaining_chunk_length;
		self.items:pop();
		assert(#chunk:sub(consumed + 1, -1) == remaining_chunk_length);
		return chunk:sub(consumed + 1, -1), remaining_chunk_length;
	end
	local end_pos = consumed + requested_bytes;
	self.front_consumed = end_pos;
	self._length = self._length - requested_bytes;
	assert(#chunk:sub(consumed + 1, end_pos) == requested_bytes);
	return chunk:sub(consumed + 1, end_pos), requested_bytes;
end

function dbuffer_methods:read(requested_bytes)
	local chunks;

	if requested_bytes and requested_bytes > self._length then
		return nil;
	end

	local chunk, read_bytes = self:read_chunk(requested_bytes);
	if not requested_bytes then
		return chunk;
	elseif chunk then
		requested_bytes = requested_bytes - read_bytes;
		if requested_bytes == 0 then -- Already read everything we need
			return chunk;
		end
		chunks = {};
	else
		return nil;
	end

	-- Need to keep reading more chunks
	while chunk do
		table.insert(chunks, chunk);
		if requested_bytes > 0 then
			chunk, read_bytes = self:read_chunk(requested_bytes);
			requested_bytes = requested_bytes - read_bytes;
		else
			break;
		end
	end

	return table.concat(chunks);
end

-- Read to, and including, the specified character sequence (return nil if not found)
function dbuffer_methods:read_until(char)
	local buffer_pos = 0;
	for i, chunk in self.items:items() do
		local start = 1 + ((i == 1) and self.front_consumed or 0);
		local char_pos = chunk:find(char, start, true);
		if char_pos then
			return self:read(1 + buffer_pos + char_pos - start);
		end
		buffer_pos = buffer_pos + #chunk - (start - 1);
	end
	return nil;
end

function dbuffer_methods:discard(requested_bytes)
	if requested_bytes > self._length then
		return nil;
	end

	local chunk, read_bytes = self:read_chunk(requested_bytes);
	if chunk then
		requested_bytes = requested_bytes - read_bytes;
		if requested_bytes == 0 then -- Already read everything we need
			return true;
		end
	else
		return nil;
	end

	while chunk do
		if requested_bytes > 0 then
			chunk, read_bytes = self:read_chunk(requested_bytes);
			requested_bytes = requested_bytes - read_bytes;
		else
			break;
		end
	end
	return true;
end

-- Normalize i, j into absolute offsets within the
-- front chunk (accounting for front_consumed), and
-- ensure there is enough data in the first chunk
-- to cover any subsequent :sub() or :byte() operation
function dbuffer_methods:_prep_sub(i, j)
	if j == nil then
		j = -1;
	end
	if j < 0 then
		j = self._length + (j+1);
	end
	if i < 0 then
		i = self._length + (i+1);
	end
	if i < 1 then
		i = 1;
	end
	if j > self._length then
		j = self._length;
	end
	if i > j then
		return nil;
	end

	self:collapse(j);

	if self.front_consumed > 0 then
		i = i + self.front_consumed;
		j = j + self.front_consumed;
	end

	return i, j;
end

function dbuffer_methods:sub(i, j)
	i, j = self:_prep_sub(i, j);
	if not i then
		return "";
	end
	return s_sub(self.items:peek(), i, j);
end

function dbuffer_methods:byte(i, j)
	i = i or 1;
	j = j or i;
	i, j = self:_prep_sub(i, j);
	if not i then
		return;
	end
	return s_byte(self.items:peek(), i, j);
end

function dbuffer_methods:length()
	return self._length;
end
dbuffer_methods.len = dbuffer_methods.length; -- strings have :len()
dynamic_buffer_mt.__len = dbuffer_methods.length; -- support # operator

function dbuffer_methods:collapse(bytes)
	bytes = bytes or self._length;

	local front_chunk = self.items:peek();

	if not front_chunk or #front_chunk - self.front_consumed >= bytes then
		return;
	end

	local front_chunks = { front_chunk:sub(self.front_consumed+1) };
	local front_bytes = #front_chunks[1];

	while front_bytes < bytes do
		self.items:pop();
		local chunk = self.items:peek();
		front_bytes = front_bytes + #chunk;
		table.insert(front_chunks, chunk);
	end
	self.items:replace(table.concat(front_chunks));
	self.front_consumed = 0;
end

local function new(max_size, max_chunks)
	if max_size and max_size <= 0 then
		return nil;
	end
	return setmetatable({
		front_consumed = 0;
		_length = 0;
		max_size = max_size;
		items = queue.new(max_chunks or 32);
	}, dynamic_buffer_mt);
end

return {
	new = new;
};