Changeset

9581:1be99aedb0b3

net.server: Add an API for holding writes of outgoing data
author Kim Alvefur <zash@zash.se>
date Thu, 25 Oct 2018 15:12:59 +0200
parents 9580:5fe91bfb734c
children 9586:44dbee2f173f
files net/server_epoll.lua net/server_event.lua net/server_select.lua
diffstat 3 files changed, 50 insertions(+), 14 deletions(-) [+]
line wrap: on
line diff
--- a/net/server_epoll.lua	Sun Oct 28 17:11:18 2018 +0100
+++ b/net/server_epoll.lua	Thu Oct 25 15:12:59 2018 +0200
@@ -409,8 +409,10 @@
 	else
 		self.writebuffer = { data };
 	end
-	self:setwritetimeout();
-	self:set(nil, true);
+	if not self._write_lock then
+		self:setwritetimeout();
+		self:set(nil, true);
+	end
 	return #data;
 end
 interface.send = interface.write;
@@ -590,6 +592,20 @@
 	end);
 end
 
+function interface:pause_writes()
+	self._write_lock = true;
+	self:setwritetimeout(false);
+	self:set(nil, false);
+end
+
+function interface:resume_writes()
+	self._write_lock = nil;
+	if self.writebuffer[1] then
+		self:setwritetimeout();
+		self:set(nil, true);
+	end
+end
+
 -- Connected!
 function interface:onconnect()
 	if self.conn and not self.peername and self.conn.getpeername then
--- a/net/server_event.lua	Sun Oct 28 17:11:18 2018 +0100
+++ b/net/server_event.lua	Thu Oct 25 15:12:59 2018 +0200
@@ -273,6 +273,19 @@
 	end
 end
 
+function interface_mt:pause_writes()
+	return self:_lock(self.nointerface, self.noreading, true);
+end
+
+function interface_mt:resume_writes()
+	self:_lock(self.nointerface, self.noreading, false);
+	if self.writecallback and not self.eventwrite then
+		self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT );  -- register callback
+		return true;
+	end
+end
+
+
 function interface_mt:counter(c)
 	if c then
 		self._connections = self._connections + c
--- a/net/server_select.lua	Sun Oct 28 17:11:18 2018 +0100
+++ b/net/server_select.lua	Thu Oct 25 15:12:59 2018 +0200
@@ -485,20 +485,27 @@
 		out_error( "server.lua, lock() is deprecated" )
 		handler.lock_read (self, switch)
 		if switch == true then
-			local tmp = _sendlistlen
-			_sendlistlen = removesocket( _sendlist, socket, _sendlistlen )
-			_writetimes[ handler ] = nil
-			if _sendlistlen ~= tmp then
-				nosend = true
-			end
+			handler.pause_writes (self)
 		elseif switch == false then
-			if nosend then
-				nosend = false
-				write( "" )
-			end
+			handler.resume_writes (self)
 		end
 		return noread, nosend
 	end
+	handler.pause_writes = function (self)
+		local tmp = _sendlistlen
+		_sendlistlen = removesocket( _sendlist, socket, _sendlistlen )
+		_writetimes[ handler ] = nil
+		if _sendlistlen ~= tmp then
+			nosend = true
+		end
+	end
+	handler.resume_writes = function (self)
+		if nosend then
+			nosend = false
+			write( "" )
+		end
+	end
+
 	local _readbuffer = function( ) -- this function reads data
 		local buffer, err, part = receive( socket, pattern )	-- receive buffer with "pattern"
 		if not err or (err == "wantread" or err == "timeout") then -- received something
@@ -716,7 +723,7 @@
 	function receiver.sendbuffer()
 		_sendbuffer();
 		if sender_locked and receiver.bufferlen() < buffersize then
-			sender:resume(); -- Unlock now
+			sender:lock_read(false); -- Unlock now
 			sender_locked = nil;
 		end
 	end
@@ -726,7 +733,7 @@
 		_readbuffer();
 		if not sender_locked and receiver.bufferlen() >= buffersize then
 			sender_locked = true;
-			sender:pause();
+			sender:lock_read(true);
 		end
 	end
 	sender:set_mode("*a");