Changeset

260:1fdd201c1d43

Merge Zash->trunk
author Matthew Wild <mwild1@gmail.com>
date Sun, 10 Oct 2010 21:37:44 +0100
parents 259:d137515e0701 (diff) 253:7410d1005fea (current diff)
children 261:0f46fb2dbc79
files
diffstat 3 files changed, 153 insertions(+), 27 deletions(-) [+]
line wrap: on
line diff
--- a/mod_adhoc/adhoc/mod_adhoc.lua	Sun Sep 26 18:23:24 2010 +0200
+++ b/mod_adhoc/adhoc/mod_adhoc.lua	Sun Oct 10 21:37:44 2010 +0100
@@ -17,25 +17,34 @@
 module:hook("iq/host/"..xmlns_disco.."#info:query", function (event)
 	local origin, stanza = event.origin, event.stanza;
 	local node = stanza.tags[1].attr.node;
-	if stanza.attr.type == "get" and node
-	    and commands[node] then
-		-- Required for Prosody <= 0.7
-		local privileged = is_admin(stanza.attr.from)
-		    or is_admin(stanza.attr.from, stanza.attr.to);
-		if (commands[node].permission == "admin" and privileged)
-		    or (commands[node].permission == "user") then
+	if stanza.attr.type == "get" and node then
+		if commands[node] then
+			-- Required for Prosody <= 0.7
+			local privileged = is_admin(stanza.attr.from)
+			    or is_admin(stanza.attr.from, stanza.attr.to);
+			if (commands[node].permission == "admin" and privileged)
+			    or (commands[node].permission == "user") then
+				reply = st.reply(stanza);
+				reply:tag("query", { xmlns = xmlns_disco.."#info",
+				    node = node });
+				reply:tag("identity", { name = commands[node].name,
+				    category = "automation", type = "command-node" }):up();
+				reply:tag("feature", { var = xmlns_cmd }):up();
+				reply:tag("feature", { var = "jabber:x:data" }):up();
+			else
+				reply = st.error_reply(stanza, "auth", "forbidden", "This item is not available to you");
+			end
+			origin.send(reply);
+			return true;
+		elseif node == xmlns_cmd then
 			reply = st.reply(stanza);
 			reply:tag("query", { xmlns = xmlns_disco.."#info",
 			    node = node });
-			reply:tag("identity", { name = commands[node].name,
-			    category = "automation", type = "command-node" }):up();
-			reply:tag("feature", { var = xmlns_cmd }):up();
-			reply:tag("feature", { var = "jabber:x:data" }):up();
-		else
-			reply = st.error_reply(stanza, "auth", "forbidden", "This item is not available to you");
+			reply:tag("identity", { name = "Ad-Hoc Commands",
+			    category = "automation", type = "command-list" }):up();
+			origin.send(reply);
+			return true;
 		end
-		origin.send(reply);
-		return true;
 	end
 end);
 
--- a/mod_smacks/mod_smacks.lua	Sun Sep 26 18:23:24 2010 +0200
+++ b/mod_smacks/mod_smacks.lua	Sun Oct 10 21:37:44 2010 +0100
@@ -4,11 +4,13 @@
 local math_min = math.min;
 local tonumber, tostring = tonumber, tostring;
 local add_filter = require "util.filters".add_filter;
+local timer = require "util.timer";
 
 local xmlns_sm = "urn:xmpp:sm:2";
 
 local sm_attr = { xmlns = xmlns_sm };
 
+local resume_timeout = 300;
 local max_unacked_stanzas = 0;
 
 module:add_event_hook("stream-features",
@@ -30,11 +32,11 @@
 			local queue = {};
 			session.outgoing_stanza_queue = queue;
 			session.last_acknowledged_stanza = 0;
-			local _send = session.send;
-			function session.send(stanza)
+			local _send = session.sends2s or session.send;
+			local function new_send(stanza)
 				local attr = stanza.attr;
 				if attr and not attr.xmlns then -- Stanza in default stream namespace
-					queue[#queue+1] = st.reply(stanza);
+					queue[#queue+1] = st.clone(stanza);
 				end
 				local ok, err = _send(stanza);
 				if ok and #queue > max_unacked_stanzas and not session.awaiting_ack then
@@ -44,6 +46,12 @@
 				return ok, err;
 			end
 			
+			if session.sends2s then
+				session.sends2s = new_send;
+			else
+				session.send = new_send;
+			end
+			
 			session.handled_stanza_count = 0;
 			add_filter(session, "stanzas/in", function (stanza)
 				if not stanza.attr.xmlns then
@@ -66,7 +74,7 @@
 	end
 	module:log("debug", "Received ack request, acking for %d", origin.handled_stanza_count);
 	-- Reply with <a>
-	origin.send(st.stanza("a", { xmlns = xmlns_sm, h = tostring(origin.handled_stanza_count) }));
+	(origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = tostring(origin.handled_stanza_count) }));
 	return true;
 end);
 
@@ -74,6 +82,7 @@
 	if not origin.smacks then return; end
 	origin.awaiting_ack = nil;
 	-- Remove handled stanzas from outgoing_stanza_queue
+	--log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or "");
 	local handled_stanza_count = tonumber(stanza.attr.h)-origin.last_acknowledged_stanza;
 	local queue = origin.outgoing_stanza_queue;
 	if handled_stanza_count > #queue then
@@ -101,12 +110,12 @@
 	if #queue > 0 then
 		session.outgoing_stanza_queue = {};
 		for i=1,#queue do
-			local reply = queue[i];
+			local reply = st.reply(queue[i]);
 			if reply.attr.to ~= session.full_jid then
 				reply.attr.type = "error";
 				reply:tag("error", error_attr)
 					:tag("recipient-unavailable", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"});
-				core_process_stanza(session, queue[i]);
+				core_process_stanza(session, reply);
 			end
 		end
 	end
@@ -115,14 +124,26 @@
 local _destroy_session = sessionmanager.destroy_session;
 function sessionmanager.destroy_session(session, err)
 	if session.smacks then
-		local queue = session.outgoing_stanza_queue;
-		if #queue > 0 then
-			module:log("warn", "Destroying session with %d unacked stanzas:", #queue);
-			for i=1,#queue do
-				module:log("warn", "::%s", tostring(queue[i]));
+		if not session.resumption_token then
+			local queue = session.outgoing_stanza_queue;
+			if #queue > 0 then
+				module:log("warn", "Destroying session with %d unacked stanzas:", #queue);
+				for i=1,#queue do
+					module:log("warn", "::%s", tostring(queue[i]));
+				end
+				handle_unacked_stanzas(session);
 			end
-			handle_unacked_stanzas(session);
+		else
+			session.hibernating = true;
+			timer.add_task(resume_timeout, function ()
+				if session.hibernating then
+					session.resumption_token = nil;
+					sessionmanager.destroy_session(session); -- Re-destroy
+				end
+			end);
+			return; -- Postpone destruction for now
 		end
+		
 	end
 	return _destroy_session(session, err);
 end
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_streamstats/mod_streamstats.lua	Sun Oct 10 21:37:44 2010 +0100
@@ -0,0 +1,96 @@
+local stats = prosody.stats;
+
+if not stats then
+	stats = {
+		stats = {}; conns = {};
+		
+		broadcast = function (self, stat)
+			local value = self.stats[stat];
+			for conn in pairs(self.conns) do
+				conn:write(stat..":"..value.."\n");
+			end
+		end;
+
+		adjust = function (self, stat, delta)
+			if delta == 0 then return; end
+			self.stats[stat] = (self.stats[stat] or 0) + delta;
+			self:broadcast(stat);
+		end;
+
+		set = function (self, stat, value)
+			if value == self.stats[stat] then return; end
+			self.stats[stat] = value;
+			self:broadcast(stat);
+		end;
+		
+		add_conn = function (self, conn)
+			self.conns[conn] = true;
+			for stat, value in pairs(self.stats) do
+				conn:write(stat..":"..value.."\n");
+			end
+		end;
+		
+		remove_conn = function (self, conn)
+			self.conns[conn] = nil;
+		end;
+	};
+	prosody.stats = stats;
+	
+	local network = {};
+	
+	function network.onconnect(conn)
+		stats:add_conn(conn);
+	end
+	
+	function network.onincoming(conn, data)
+	end
+	
+	function network.ondisconnect(conn, reason)
+		stats:remove_conn(conn);
+	end
+	
+	require "util.iterators";
+	require "util.timer".add_task(1, function ()
+		stats:set("s2s-in", count(keys(prosody.incoming_s2s)));
+		return math.random(10, 20);
+	end);
+	require "util.timer".add_task(3, function ()
+		local s2sout_count = 0;
+		for _, host in pairs(prosody.hosts) do
+			s2sout_count = s2sout_count + count(keys(host.s2sout));
+		end
+		stats:set("s2s-out", s2sout_count);
+		return math.random(10, 20);
+	end);
+	
+	require "net.connlisteners".register("stats", network);
+	require "net.connlisteners".start("stats", { port = module:get_option("stats_ports") or 5444, interface = "127.0.0.1" });
+end
+
+module:hook("resource-bind", function ()
+	stats:adjust("c2s", 1);
+end);
+module:hook("resource-unbind", function ()
+	stats:adjust("c2s", -1);
+end);
+
+local c2s_count = 0;
+for username, user in pairs(hosts[module.host].sessions or {}) do
+	for resource, session in pairs(user.sessions or {}) do
+		c2s_count = c2s_count + 1;
+	end
+end
+stats:adjust("c2s", c2s_count);
+
+module:hook("s2sin-established", function (event)
+	stats:adjust("s2s-in", 1);
+end);
+module:hook("s2sin-destroyed", function (event)
+	stats:adjust("s2s-in", -1);
+end);
+module:hook("s2sout-established", function (event)
+	stats:adjust("s2s-out", 1);
+end);
+module:hook("s2sout-destroyed", function (event)
+	stats:adjust("s2s-out", -1);
+end);