Changeset

2314:c2e1bde4d84d

Redo merge with Waqas' PBKDF2 optimizations.
author Tobias Markmann <tm@ayena.de>
date Thu, 03 Dec 2009 21:57:47 +0100
parents 2312:5ddbb9c89ffe (diff) 2313:d76e8d7b7f5d (current diff)
children 2315:174b4a83f5b7
files util/hmac.lua util/sasl/plain.lua util/sasl/scram.lua
diffstat 11 files changed, 141 insertions(+), 49 deletions(-) [+]
line wrap: on
line diff
--- a/core/modulemanager.lua	Thu Dec 03 21:53:36 2009 +0100
+++ b/core/modulemanager.lua	Thu Dec 03 21:57:47 2009 +0100
@@ -158,6 +158,7 @@
 		log("error", "Error initializing module '%s' on '%s': %s", module_name, host, err or "nil");
 	end
 	if success then
+		hosts[host].events.fire_event("module-loaded", { module = module_name, host = host });
 		return true;
 	else -- load failed, unloading
 		unload(api_instance.host, module_name);
@@ -174,7 +175,7 @@
 end
 
 function unload(host, name, ...)
-	local mod = get_module(host, name); 
+	local mod = get_module(host, name);
 	if not mod then return nil, "module-not-loaded"; end
 	
 	if module_has_method(mod, "unload") then
@@ -200,6 +201,7 @@
 	end
 	hooks:remove(host, name);
 	modulemap[host][name] = nil;
+	hosts[host].events.fire_event("module-unloaded", { module = name, host = host });
 	return true;
 end
 
@@ -280,7 +282,7 @@
 end
 
 function call_module_method(module, method, ...)
-	if module_has_method(module, method) then	
+	if module_has_method(module, method) then
 		local f = module.module[method];
 		return pcall(f, ...);
 	else
@@ -289,7 +291,7 @@
 end
 
 ----- API functions exposed to modules -----------
--- Must all be in api.* 
+-- Must all be in api.*
 
 -- Returns the name of the current module
 function api:get_name()
--- a/net/dns.lua	Thu Dec 03 21:53:36 2009 +0100
+++ b/net/dns.lua	Thu Dec 03 21:57:47 2009 +0100
@@ -726,7 +726,7 @@
 			local packet = sock:receive();
 			if packet then
 				response = self:decode(packet);
-				if response and self.active[response.header.id] 
+				if response and self.active[response.header.id]
 					and self.active[response.header.id][response.question.raw] then
 					--print('received response');
 					--self.print(response);
@@ -745,7 +745,7 @@
 					if not next(self.active) then self:closeall(); end
 
 					-- was the query on the wanted list?
-					local q = response.question;
+					local q = response.question[1];
 					local cos = get(self.wanted, q.class, q.type, q.name);
 					if cos then
 						for co in pairs(cos) do
@@ -768,21 +768,18 @@
 	self.time = socket.gettime();
 
 	local response = self:decode(packet);
-	if response then
+	if response and self.active[response.header.id]
+		and self.active[response.header.id][response.question.raw] then
 		--print('received response');
 		--self.print(response);
 
-		for i,section in pairs({ 'answer', 'authority', 'additional' }) do
-			for j,rr in pairs(response[section]) do
-				self:remember(rr, response.question[1].type);
-			end
+		for j,rr in pairs(response.answer) do
+			self:remember(rr, response.question[1].type);
 		end
 
 		-- retire the query
 		local queries = self.active[response.header.id];
-		if queries[response.question.raw] then
-			queries[response.question.raw] = nil;
-		end
+		queries[response.question.raw] = nil;
 		if not next(queries) then self.active[response.header.id] = nil; end
 		if not next(self.active) then self:closeall(); end
 
--- a/net/httpserver.lua	Thu Dec 03 21:53:36 2009 +0100
+++ b/net/httpserver.lua	Thu Dec 03 21:57:47 2009 +0100
@@ -23,6 +23,9 @@
 
 local log = require "util.logger".init("httpserver");
 
+-- TODO: Should we read this from /etc/mime.types if it exists? (startup time...?)
+local mime_map = { html = "text/html", txt = "plain/text; charset=utf-8", js = "text/javascript" };
+
 local http_servers = {};
 
 module "httpserver"
@@ -65,6 +68,9 @@
 		
 		resp = { "HTTP/1.0 200 OK\r\n" };
 		t_insert(resp, "Connection: close\r\n");
+		t_insert(resp, "Content-Type: ");
+		t_insert(resp, mime_map[request.url.path:match("%.(%w+)")] or "application/octet-stream");
+		t_insert(resp, "\r\n");
 		t_insert(resp, "Content-Length: ");
 		t_insert(resp, #response);
 		t_insert(resp, "\r\n\r\n");
@@ -210,7 +216,7 @@
 function new_request(handler)
 	return { handler = handler, conn = handler.socket, 
 			write = function (...) return handler:write(...); end, state = "request", 
-			server = http_servers[handler.serverport()],
+			server = http_servers[handler:serverport()],
 			send = send_response,
 			destroy = destroy_request,
 			id = tostring{}:match("%x+$")
--- a/net/server_event.lua	Thu Dec 03 21:53:36 2009 +0100
+++ b/net/server_event.lua	Thu Dec 03 21:57:47 2009 +0100
@@ -572,6 +572,7 @@
 					interface.eventread = nil
 					return -1
 				end
+				interface.onincoming( interface, buffer, err )  -- send new data to listener
 				if err and ( err ~= "timeout" and err ~= "wantread" ) then
 					if "wantwrite" == err then -- need to read on write event
 						if not interface.eventwrite then  -- register new write event if needed
@@ -592,7 +593,6 @@
 						return -1
 					end
 				end
-				interface.onincoming( interface, buffer, err )  -- send new data to listener
 				return EV_READ, cfg.READ_TIMEOUT
 			end
 		end
--- a/net/server_select.lua	Thu Dec 03 21:53:36 2009 +0100
+++ b/net/server_select.lua	Thu Dec 03 21:57:47 2009 +0100
@@ -434,9 +434,27 @@
     handler.bufferlen = function( self, readlen, sendlen )
         maxsendlen = sendlen or maxsendlen
         maxreadlen = readlen or maxreadlen
-        return maxreadlen, maxsendlen
+        return bufferlen, maxreadlen, maxsendlen
+    end
+    handler.lock_read  = function (self, switch)
+        if switch == true then
+            local tmp = _readlistlen
+            _readlistlen = removesocket( _readlist, socket, _readlistlen )
+            _readtimes[ handler ] = nil
+            if _readlistlen ~= tmp then
+                noread = true
+            end
+        elseif switch == false then
+            if noread then
+                noread = false
+                _readlistlen = addsocket(_readlist, socket, _readlistlen)
+                _readtimes[ handler ] = _currenttime
+            end
+        end
+        return noread
     end
     handler.lock = function( self, switch )
+        handler.lock_read (switch)
         if switch == true then
             handler.write = idfalse
             local tmp = _sendlistlen
@@ -445,19 +463,8 @@
             if _sendlistlen ~= tmp then
                 nosend = true
             end
-            tmp = _readlistlen
-            _readlistlen = removesocket( _readlist, socket, _readlistlen )
-            _readtimes[ handler ] = nil
-            if _readlistlen ~= tmp then
-                noread = true
-            end
         elseif switch == false then
             handler.write = write
-            if noread then
-                noread = false
-                _readlistlen = addsocket(_readlist, socket, _readlistlen)
-                _readtimes[ handler ] = _currenttime
-            end
             if nosend then
                 nosend = false
                 write( "" )
@@ -467,7 +474,7 @@
     end
     local _readbuffer = function( )    -- this function reads data
         local buffer, err, part = receive( socket, pattern )    -- receive buffer with "pattern"
-        if not err or ( err == "timeout" or err == "wantread" ) then    -- received something
+        if not err or (err == "wantread" or err == "timeout") or string_len(part) > 0 then    -- received something
             local buffer = buffer or part or ""
             local len = string_len( buffer )
             if len > maxreadlen then
--- a/plugins/mod_compression.lua	Thu Dec 03 21:53:36 2009 +0100
+++ b/plugins/mod_compression.lua	Thu Dec 03 21:57:47 2009 +0100
@@ -44,7 +44,7 @@
 		end
 );
 
--- S2Sout handling aka the client perspective in the S2S connection
+-- Hook to activate compression if remote server supports it.
 module:hook_stanza(xmlns_stream, "features",
 		function (session, stanza)
 			if not session.compressed then
@@ -135,15 +135,14 @@
 		end;
 end
 
--- TODO Support compression on S2S level too.
 module:add_handler({"s2sout_unauthed", "s2sout"}, "compressed", xmlns_compression_protocol, 
 		function(session ,stanza)
 			session.log("debug", "Activating compression...")
 			-- create deflate and inflate streams
-			deflate_stream = get_deflate_stream(session);
+			local deflate_stream = get_deflate_stream(session);
 			if not deflate_stream then return end
 			
-			inflate_stream = get_inflate_stream(session);
+			local inflate_stream = get_inflate_stream(session);
 			if not inflate_stream then return end
 			
 			-- setup compression for session.w
@@ -161,7 +160,7 @@
 			local default_stream_attr = {xmlns = "jabber:server", ["xmlns:stream"] = "http://etherx.jabber.org/streams",
 										["xmlns:db"] = 'jabber:server:dialback', version = "1.0", to = session.to_host, from = session.from_host};
 			session.sends2s("<?xml version='1.0'?>");
-			session.sends2s(st.stanza("stream:stream", default_stream_attr):top_tag());			
+			session.sends2s(st.stanza("stream:stream", default_stream_attr):top_tag());
 			session.compressed = true;
 		end
 );
@@ -181,10 +180,10 @@
 				session.log("info", method.." compression selected.");
 				
 				-- create deflate and inflate streams
-				deflate_stream = get_deflate_stream(session);
+				local deflate_stream = get_deflate_stream(session);
 				if not deflate_stream then return end
 				
-				inflate_stream = get_inflate_stream(session);
+				local inflate_stream = get_inflate_stream(session);
 				if not inflate_stream then return end
 				
 				(session.sends2s or session.send)(st.stanza("compressed", {xmlns=xmlns_compression_protocol}));
--- a/plugins/mod_console.lua	Thu Dec 03 21:53:36 2009 +0100
+++ b/plugins/mod_console.lua	Thu Dec 03 21:57:47 2009 +0100
@@ -478,7 +478,7 @@
 		for remotehost, session in pairs(host_session.s2sout) do
 			if (not match_jid) or remotehost:match(match_jid) or host:match(match_jid) then
 				count_out = count_out + 1;
-				print("    "..host.." -> "..remotehost..(session.secure and " (encrypted)" or ""));
+				print("    "..host.." -> "..remotehost..(session.secure and " (encrypted)" or "")..(session.compressed and " (compressed)" or ""));
 				if session.sendq then
 					print("        There are "..#session.sendq.." queued outgoing stanzas for this connection");
 				end
@@ -515,7 +515,7 @@
 				-- Pft! is what I say to list comprehensions
 				or (session.hosts and #array.collect(keys(session.hosts)):filter(subhost_filter)>0)) then
 				count_in = count_in + 1;
-				print("    "..host.." <- "..(session.from_host or "(unknown)")..(session.secure and " (encrypted)" or ""));
+				print("    "..host.." <- "..(session.from_host or "(unknown)")..(session.secure and " (encrypted)" or "")..(session.compressed and " (compressed)" or ""));
 				if session.type == "s2sin_unauthed" then
 						print("        Connection not yet authenticated");
 				end
--- a/plugins/mod_proxy65.lua	Thu Dec 03 21:53:36 2009 +0100
+++ b/plugins/mod_proxy65.lua	Thu Dec 03 21:57:47 2009 +0100
@@ -55,8 +55,12 @@
 	if session.setup then
 		if session.sha ~= nil and transfers[session.sha] ~= nil then
 			local sha = session.sha;
-			if transfers[sha].activated == true and transfers[sha].initiator == conn and transfers[sha].target ~= nil then
-				transfers[sha].target:write(data);
+			if transfers[sha].activated == true and transfers[sha].target ~= nil then
+				if  transfers[sha].initiator == conn then
+					transfers[sha].target:write(data);
+				else
+					transfers[sha].initiator:write(data);
+				end
 				return;
 			end
 		end
@@ -67,7 +71,7 @@
 			data:sub(4):byte() == 0x03 and -- ATYP must be 3
 			data:sub(5):byte() == 40 and -- SHA1 HASH length must be 40 (0x28)
 			data:sub(-2):byte() == 0x00 and -- PORT must be 0, size 2 byte
-			data:sub(-1):byte() == 0x00 		
+			data:sub(-1):byte() == 0x00
 		then
 			local sha = data:sub(6, 45); -- second param is not count! it's the ending index (included!)
 			if transfers[sha] == nil then
@@ -80,10 +84,13 @@
 				transfers[sha].initiator = conn;
 				session.sha = sha;
 				module:log("debug", "initiator connected ... ");
+				throttle_sending(conn, transfers[sha].target);          
+				throttle_sending(transfers[sha].target, conn);          
 			end
 			conn:write(string.char(5, 0, 0, 3, sha:len()) .. sha .. string.char(0, 0)); -- VER, REP, RSV, ATYP, BND.ADDR (sha), BND.PORT (2 Byte)
+			conn:lock_read(true)
 		else
-			log:module("warn", "Neither data transfer nor initial connect of a participator of a transfer.")
+			module:log("warn", "Neither data transfer nor initial connect of a participator of a transfer.")
 			conn.close();
 		end
 	else
@@ -237,6 +244,8 @@
 				elseif(transfers[sha] ~= nil and transfers[sha].initiator ~= nil and transfers[sha].target ~= nil) then
 					origin.send(reply);
 					transfers[sha].activated = true;
+					transfers[sha].target:lock_read(false);
+					transfers[sha].initiator:lock_read(false);
 				end
 			else
 				module:log("error", "activation failed: sid: %s, initiator: %s, target: %s", tostring(sid), tostring(from), tostring(to));
@@ -247,9 +256,31 @@
 end
 
 if not connlisteners.register(module.host .. ':proxy65', connlistener) then
-	error("mod_proxy65: Could not establish a connection listener. Check your configuration please.");
-	error(" one possible cause for this would be that two proxy65 components share the same port.");
+	module:log("error", "mod_proxy65: Could not establish a connection listener. Check your configuration please.");
+	module:log("error", "Possibly two proxy65 components are configured to share the same port.");
 end
 
 connlisteners.start(module.host .. ':proxy65');
 component = componentmanager.register_component(host, handle_to_domain);
+local sender_lock_threshold = 4096;
+function throttle_sending(sender, receiver)
+	sender:pattern(sender_lock_threshold);
+	local sender_locked;
+	local _sendbuffer = receiver.sendbuffer;
+	function receiver.sendbuffer()
+		_sendbuffer();
+		if sender_locked and receiver.bufferlen() < sender_lock_threshold then
+			sender:lock_read(false); -- Unlock now
+			sender_locked = nil;
+		end
+	end
+	
+	local _readbuffer = sender.readbuffer;
+	function sender.readbuffer()
+		_readbuffer();
+		if not sender_locked and receiver.bufferlen() >= sender_lock_threshold then
+			sender_locked = true;
+			sender:lock_read(true);
+		end
+	end
+end
--- a/prosodyctl	Thu Dec 03 21:53:36 2009 +0100
+++ b/prosodyctl	Thu Dec 03 21:57:47 2009 +0100
@@ -462,6 +462,28 @@
 	return 1;
 end
 
+function commands.addplugin(arg)
+	local url = arg[1];
+	if url:match("^http://") then
+		local http = require "socket.http";
+		show_message("Fetching...");
+		local code, err = http.request(url);
+		if not code then
+			show_message("Failed: "..err);
+			return 1;
+		end
+		if url:match("%.lua$") then
+			local ok, err = datamanager.store(url:match("/mod_([^/]+)$"), "*", "plugins", {code});
+			if not ok then
+				show_message("Failed to save to data store: "..err);
+				return 1;
+			end
+		end
+		show_message("Saved. Don't forget to load the module using the config file or admin console!");
+	else
+		show_message("Sorry, I don't understand how to fetch plugins from there.");
+	end
+end
 
 ---------------------
 
--- a/util/dependencies.lua	Thu Dec 03 21:53:36 2009 +0100
+++ b/util/dependencies.lua	Thu Dec 03 21:57:47 2009 +0100
@@ -57,7 +57,7 @@
 if not lfs then
 	missingdep("luafilesystem", {
 			["luarocks"] = "luarocks install luafilesystem";
-	 		["Debian/Ubuntu"] = "sudo apt-get install liblua5.1-luafilesystem0";
+	 		["Debian/Ubuntu"] = "sudo apt-get install liblua5.1-filesystem0";
 	 		["Source"] = "http://www.keplerproject.org/luafilesystem/";
 	 	});
 	fatal = true;
--- a/util/pluginloader.lua	Thu Dec 03 21:53:36 2009 +0100
+++ b/util/pluginloader.lua	Thu Dec 03 21:57:47 2009 +0100
@@ -9,11 +9,19 @@
 
 local plugin_dir = CFG_PLUGINDIR or "./plugins/";
 
-local io_open = io.open;
-local loadstring = loadstring;
+local io_open, os_time = io.open, os.time;
+local loadstring, pairs = loadstring, pairs;
+
+local datamanager = require "util.datamanager";
 
 module "pluginloader"
 
+local function load_from_datastore(name)
+	local content = datamanager.load(name, "*", "plugins");
+	if not content or not content[1] then return nil, "Resource not found"; end
+	return content[1], name;
+end
+
 local function load_file(name)
 	local file, err = io_open(plugin_dir..name);
 	if not file then return file, err; end
@@ -22,16 +30,36 @@
 	return content, name;
 end
 
-function load_resource(plugin, resource)
+function load_resource(plugin, resource, loader)
 	if not resource then
 		resource = "mod_"..plugin..".lua";
 	end
-	local content, err = load_file(plugin.."/"..resource);
-	if not content then content, err = load_file(resource); end
+	loader = loader or load_file;
+
+	local content, err = loader(plugin.."/"..resource);
+	if not content then content, err = loader(resource); end
 	-- TODO add support for packed plugins
+	
+	if not content and loader == load_file then
+		return load_resource(plugin, resource, load_from_datastore);
+	end
+	
 	return content, err;
 end
 
+function store_resource(plugin, resource, content, metadata)
+	if not resource then
+		resource = "mod_"..plugin..".lua";
+	end
+	local store = { content };
+	if metadata then
+		for k,v in pairs(metadata) do
+			store[k] = v;
+		end
+	end
+	datamanager.store(plugin.."/"..resource, "*", "plugins", store);
+end
+
 function load_code(plugin, resource)
 	local content, err = load_resource(plugin, resource);
 	if not content then return content, err; end