Changeset

13552:f0b2c026e542

net.server_epoll: Improve efficiency of sending much buffered data Problem: The string slice operations when a lot of data gets buffered ends up being expensive and memory-consuming. We have util.dbuffer for precisely this kind of thing. I want to keep the behavior of writebuffer being upgraded from nil to a string to full buffer since the last step involves three table allocations, where the previous buffer method only used one. Avoiding those allocations for simple writes like white space keep alive feels like it would keep memory churn down. This work was started in 2020
author Kim Alvefur <zash@zash.se>
date Sat, 09 Nov 2024 00:37:15 +0100
parents 13551:e17ff906d71b
children 13553:850e4ade7a01
files net/server_epoll.lua
diffstat 1 files changed, 68 insertions(+), 34 deletions(-) [+]
line wrap: on
line diff
--- a/net/server_epoll.lua	Fri Nov 08 02:15:10 2024 +0100
+++ b/net/server_epoll.lua	Sat Nov 09 00:37:15 2024 +0100
@@ -6,8 +6,6 @@
 --
 
 
-local t_insert = table.insert;
-local t_concat = table.concat;
 local setmetatable = setmetatable;
 local pcall = pcall;
 local type = type;
@@ -22,6 +20,7 @@
 local monotonic = require "prosody.util.time".monotonic;
 local indexedbheap = require "prosody.util.indexedbheap";
 local createtable = require "prosody.util.table".create;
+local dbuffer = require "prosody.util.dbuffer";
 local inet = require "prosody.util.net";
 local inet_pton = inet.pton;
 local _SOCKETINVALID = socket._SOCKETINVALID or -1;
@@ -94,6 +93,15 @@
 	-- Size of chunks to read from sockets
 	read_size = 8192;
 
+	-- Maximum size of send buffer, after which additional data is rejected
+	max_send_buffer_size = 32*1024*1024;
+
+	-- How many chunks (immutable strings) to keep in the send buffer
+	send_buffer_chunks = nil;
+
+	-- Maximum amount of data to send at once (to the TCP buffers), default based on /proc/sys/net/ipv4/tcp_wmem
+	max_send_chunk = 4*1024*1024;
+
 	-- Timeout used during between steps in TLS handshakes
 	ssl_handshake_timeout = 60;
 
@@ -533,26 +541,21 @@
 	self:onconnect();
 	if not self.conn then return nil, "no-conn"; end -- could have been closed in onconnect
 	self:on("predrain");
-	local buffer = self.writebuffer;
-	local data = buffer or "";
-	if type(buffer) == "table" then
-		if buffer[3] then
-			data = t_concat(data);
-		elseif buffer[2] then
-			data = buffer[1] .. buffer[2];
-		else
-			data = buffer[1] or "";
-		end
-	end
+	local buffer = self.writebuffer or "";
+	-- Naming things ... s/data/slice/ ?
+	local data = buffer:sub(1, cfg.max_send_chunk);
 	local ok, err, partial = self.conn:send(data);
 	self._writable = ok;
-	if ok then
+	if ok and #data < #buffer then
+		-- Sent the whole 'data' but there's more in the buffer
+		ok, err, partial = nil, "timeout", ok;
+	end
+	self:debug("Sent %d out of %d buffered bytes", ok and #data or partial or 0, #buffer);
+	if ok then -- all the data we had was sent successfully
 		self:set(nil, false);
 		if cfg.keep_buffers and type(buffer) == "table" then
-			for i = #buffer, 1, -1 do
-				buffer[i] = nil;
-			end
-		else
+			buffer:discard(ok);
+		else -- string or don't keep buffers
 			self.writebuffer = nil;
 		end
 		self._writing = nil;
@@ -560,14 +563,10 @@
 		self:ondrain(); -- Be aware of writes in ondrain
 		return ok;
 	elseif partial then
-		self:debug("Sent %d out of %d buffered bytes", partial, #data);
-		if cfg.keep_buffers and type(buffer) == "table" then
-			buffer[1] = data:sub(partial+1);
-			for i = #buffer, 2, -1 do
-				buffer[i] = nil;
-			end
+		if type(buffer) == "table" then
+			buffer:discard(partial);
 		else
-			self.writebuffer = data:sub(partial+1);
+			self.writebuffer = data:sub(partial + 1);
 		end
 		self:set(nil, true);
 		self:setwritetimeout();
@@ -595,13 +594,45 @@
 -- Add data to write buffer and set flag for wanting to write
 function interface:write(data)
 	local buffer = self.writebuffer;
-	if type(buffer) == "table" then
-		t_insert(buffer, data);
+	-- (nil)    -> save string
+	-- (string) -> convert to buffer (3 tables!)
+	-- (buffer) -> write to buffer
+	if not buffer then
+		self.writebuffer = data;
 	elseif type(buffer) == "string" then
-		self:noise("Allocating buffer!")
-		self.writebuffer = { buffer, data };
-	elseif buffer == nil then
-		self.writebuffer = data;
+		local prev_buffer = buffer;
+		buffer = dbuffer.new(cfg.max_send_buffer_size, cfg.send_buffer_chunks);
+		self.writebuffer = buffer;
+		if prev_buffer then
+			-- TODO refactor, there's 3 copies of these lines
+			if not buffer:write(prev_buffer) then
+				if self._write_lock then
+					return false;
+				end
+				-- Try to flush buffer to make room
+				self:onwritable();
+				if not buffer:write(prev_buffer) then
+					return false;
+				end
+			end
+		end
+		if not buffer:write(data) then
+			if self._write_lock then
+				return false;
+			end
+			self:onwritable();
+			if not buffer:write(data) then
+				return false;
+			end
+		end
+	elseif not buffer:write(data) then
+		if self._write_lock then
+			return false;
+		end
+		self:onwritable();
+		if not buffer:write(data) then
+			return false;
+		end
 	end
 	if not self._write_lock and not self._writing then
 		if self._writable and cfg.opportunistic_writes and not self._opportunistic_write then
@@ -619,7 +650,7 @@
 
 -- Close, possibly after writing is done
 function interface:close()
-	if self._connected and self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then
+	if self.writebuffer and #self.writebuffer ~= 0 then
 		self._connected = false;
 		self:set(false, true); -- Flush final buffer contents
 		self:setreadtimeout(false);
@@ -701,7 +732,7 @@
 function interface:starttls(tls_ctx)
 	if tls_ctx then self.tls_ctx = tls_ctx; end
 	self.starttls = false;
-	if self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then
+	if self.writebuffer and #self.writebuffer ~= 0 then
 		self:debug("Start TLS after write");
 		self.ondrain = interface.starttls;
 		self:set(nil, true); -- make sure wantwrite is set
@@ -935,7 +966,10 @@
 	end
 	self:noise("Resume writes");
 	self._write_lock = nil;
-	if self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then
+	if self.writebuffer and #self.writebuffer ~= 0 then
+		if cfg.opportunistic_writes then
+			return self:onwritable();
+		end
 		self:setwritetimeout();
 		self:set(nil, true);
 	end