Changeset

3007:9d122a6ae674

Merge 0.7->trunk
author Matthew Wild <mwild1@gmail.com>
date Wed, 05 May 2010 15:33:58 +0100
parents 2999:9a8f942433c4 (current diff) 3006:a3580f556c27 (diff)
children 3010:52146b82f295
files net/server_select.lua
diffstat 3 files changed, 100 insertions(+), 28 deletions(-) [+]
line wrap: on
line diff
--- a/net/server_event.lua	Wed May 05 14:17:01 2010 +0100
+++ b/net/server_event.lua	Wed May 05 15:33:58 2010 +0100
@@ -282,8 +282,21 @@
 			return nointerface, noreading, nowriting
 	end
 	
+	--TODO: Deprecate
 	function interface_mt:lock_read(switch)
-		return self:_lock(self.nointerface, switch, self.nowriting);
+		if switch then
+			return self:pause();
+		else
+			return self:resume();
+		end
+	end
+
+	function interface_mt:pause()
+		return self:_lock(self.nointerface, true, self.nowriting);
+	end
+
+	function interface_mt:resume()
+		return self:_lock(self.nointerface, false, self.nowriting);
 	end
 
 	function interface_mt:counter(c)
@@ -389,6 +402,13 @@
 			self.starttls = false; -- prevent starttls()
 		end
 	end
+
+	function interface_mt:set_mode(pattern)
+		if pattern then
+			self._pattern = pattern;
+		end
+		return self._pattern;
+	end
 	
 	function interface_mt:set_send(new_send)
 		-- No-op, we always use the underlying connection's send
@@ -445,6 +465,8 @@
 	end
 	function interface_mt:ontimeout()
 	end
+	function interface_mt:ondrain()
+	end
 	function interface_mt:onstatus()
 		debug("server.lua: Dummy onstatus()")
 	end
@@ -525,6 +547,7 @@
 				if succ then  -- writing succesful
 					interface.writebuffer = ""
 					interface.writebufferlen = 0
+					interface:ondrain();
 					if interface.fatalerror then
 						debug "closing client after writing"
 						interface:_close()  -- close interface if needed
@@ -586,7 +609,7 @@
 						interface.eventreadtimeout = nil
 					end
 				end
-				local buffer, err, part = interface.conn:receive( pattern )  -- receive buffer with "pattern"
+				local buffer, err, part = interface.conn:receive( interface._pattern )  -- receive buffer with "pattern"
 				--vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) )
 				buffer = buffer or part or ""
 				local len = string_len( buffer )
@@ -822,11 +845,32 @@
 	return signal_events[signal_num];
 end
 
+local function link(sender, receiver, buffersize)
+	sender:set_mode(buffersize);
+	local sender_locked;
+	
+	function receiver:ondrain()
+		if sender_locked then
+			sender:resume();
+			sender_locked = nil;
+		end
+	end
+	
+	function sender:onincoming(data)
+		receiver:write(data);
+		if receiver.writebufferlen >= buffersize then
+			sender_locked = true;
+			sender:pause();
+		end
+	end
+end
+
 return {
 
 	cfg = cfg,
 	base = base,
 	loop = loop,
+	link = link,
 	event = event,
 	event_base = base,
 	addevent = newevent,
--- a/net/server_select.lua	Wed May 05 14:17:01 2010 +0100
+++ b/net/server_select.lua	Wed May 05 15:33:58 2010 +0100
@@ -252,6 +252,7 @@
 	local dispatch = listeners.onincoming
 	local status = listeners.onstatus
 	local disconnect = listeners.ondisconnect
+	local drain = listeners.ondrain
 
 	local bufferqueue = { } -- buffer array
 	local bufferqueuelen = 0	-- end of buffer array
@@ -284,6 +285,7 @@
 		dispatch = listeners.onincoming
 		disconnect = listeners.ondisconnect
 		status = listeners.onstatus
+		drain = listeners.ondrain
 	end
 	handler.getstats = function( )
 		return readtraffic, sendtraffic
@@ -379,7 +381,7 @@
 	handler.socket = function( self )
 		return socket
 	end
-	handler.pattern = function( self, new )
+	handler.set_mode = function( self, new )
 		pattern = new or pattern
 		return pattern
 	end
@@ -392,6 +394,7 @@
 		maxreadlen = readlen or maxreadlen
 		return bufferlen, maxreadlen, maxsendlen
 	end
+	--TODO: Deprecate
 	handler.lock_read = function (self, switch)
 		if switch == true then
 			local tmp = _readlistlen
@@ -409,6 +412,12 @@
 		end
 		return noread
 	end
+	handler.pause = function (self)
+		return self:lock_read(true);
+	end
+	handler.resume = function (self)
+		return self:lock_read(false);
+	end
 	handler.lock = function( self, switch )
 		handler.lock_read (switch)
 		if switch == true then
@@ -430,7 +439,7 @@
 	end
 	local _readbuffer = function( ) -- this function reads data
 		local buffer, err, part = receive( socket, pattern )	-- receive buffer with "pattern"
-		if not err or (err == "wantread" or err == "timeout") or (part and string_len(part) > 0) then -- received something
+		if not err or (err == "wantread" or err == "timeout") then -- received something
 			local buffer = buffer or part or ""
 			local len = string_len( buffer )
 			if len > maxreadlen then
@@ -472,6 +481,9 @@
 			_sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) -- delete socket from writelist
 			_ = needtls and handler:starttls(nil, true)
 			_writetimes[ handler ] = nil
+			if drain then
+				drain(handler)
+			end
 			_ = toclose and handler:close( )
 			return true
 		elseif byte and ( err == "timeout" or err == "wantwrite" ) then -- want write
@@ -666,6 +678,28 @@
 	--mem_free( )
 end
 
+local function link(sender, receiver, buffersize)
+	sender:set_mode(buffersize);
+	local sender_locked;
+	local _sendbuffer = receiver.sendbuffer;
+	function receiver.sendbuffer()
+		_sendbuffer();
+		if sender_locked and receiver.bufferlen() < buffersize then
+			sender:lock_read(false); -- Unlock now
+			sender_locked = nil;
+		end
+	end
+	
+	local _readbuffer = sender.readbuffer;
+	function sender.readbuffer()
+		_readbuffer();
+		if not sender_locked and receiver.bufferlen() >= buffersize then
+			sender_locked = true;
+			sender:lock_read(true);
+		end
+	end
+end
+
 ----------------------------------// PUBLIC //--
 
 addserver = function( addr, port, listeners, pattern, sslctx ) -- this function provides a way for other scripts to reg a server
@@ -889,6 +923,7 @@
 	wrapclient = wrapclient,
 	
 	loop = loop,
+	link = link,
 	stats = stats,
 	closeall = closeall,
 	addtimer = addtimer,
--- a/plugins/mod_proxy65.lua	Wed May 05 14:17:01 2010 +0100
+++ b/plugins/mod_proxy65.lua	Wed May 05 15:33:58 2010 +0100
@@ -20,6 +20,7 @@
 local config_get = require "core.configmanager".get;
 local connlisteners = require "net.connlisteners";
 local sha1 = require "util.hashes".sha1;
+local server = require "net.server";
 
 local host, name = module:get_host(), "SOCKS5 Bytestreams Service";
 local sessions, transfers, component, replies_cache = {}, {}, nil, {};
@@ -28,6 +29,7 @@
 local proxy_interface = config_get(host, "core", "proxy65_interface") or "*";
 local proxy_address = config_get(host, "core", "proxy65_address") or (proxy_interface ~= "*" and proxy_interface) or host;
 local proxy_acl = config_get(host, "core", "proxy65_acl");
+local max_buffer_size = 4096;
 
 local connlistener = { default_port = proxy_port, default_interface = proxy_interface, default_mode = "*a" };
 
@@ -84,8 +86,8 @@
 				transfers[sha].initiator = conn;
 				session.sha = sha;
 				module:log("debug", "initiator connected ... ");
-				throttle_sending(conn, transfers[sha].target);
-				throttle_sending(transfers[sha].target, conn);
+				server.link(conn, transfers[sha].target, max_buffer_size);
+				server.link(transfers[sha].target, conn, max_buffer_size);
 			end
 			conn:write(string.char(5, 0, 0, 3, sha:len()) .. sha .. string.char(0, 0)); -- VER, REP, RSV, ATYP, BND.ADDR (sha), BND.PORT (2 Byte)
 			conn:lock_read(true)
@@ -234,8 +236,12 @@
 			elseif xmlns == "http://jabber.org/protocol/bytestreams" then
 				origin.send(get_stream_host(origin, stanza));
 				return true;
+			else
+				origin.send(st.error_reply(stanza, "cancel", "service-unavailable"));
+				return true;
 			end
 		elseif stanza.name == "iq" and type == "set" then
+			module:log("debug", "Received activation request from %s", stanza.attr.from);
 			local reply, from, to, sid = set_activation(stanza);
 			if reply ~= nil and from ~= nil and to ~= nil and sid ~= nil then
 				local sha = sha1(sid .. from .. to, true);
@@ -246,6 +252,15 @@
 					transfers[sha].activated = true;
 					transfers[sha].target:lock_read(false);
 					transfers[sha].initiator:lock_read(false);
+				else
+					module:log("debug", "Both parties were not yet connected");
+					local message = "Neither party is connected to the proxy";
+					if transfers[sha].initiator then
+						message = "The recipient is not connected to the proxy";
+					elseif transfers[sha].target then
+						message = "The sender (you) is not connected to the proxy";
+					end
+					origin.send(st.error_reply(stanza, "cancel", "not-allowed", message));
 				end
 			else
 				module:log("error", "activation failed: sid: %s, initiator: %s, target: %s", tostring(sid), tostring(from), tostring(to));
@@ -262,25 +277,3 @@
 
 connlisteners.start(module.host .. ':proxy65');
 component = componentmanager.register_component(host, handle_to_domain);
-local sender_lock_threshold = 4096;
-function throttle_sending(sender, receiver)
-	sender:pattern(sender_lock_threshold);
-	local sender_locked;
-	local _sendbuffer = receiver.sendbuffer;
-	function receiver.sendbuffer()
-		_sendbuffer();
-		if sender_locked and receiver.bufferlen() < sender_lock_threshold then
-			sender:lock_read(false); -- Unlock now
-			sender_locked = nil;
-		end
-	end
-	
-	local _readbuffer = sender.readbuffer;
-	function sender.readbuffer()
-		_readbuffer();
-		if not sender_locked and receiver.bufferlen() >= sender_lock_threshold then
-			sender_locked = true;
-			sender:lock_read(true);
-		end
-	end
-end