Diff

mod_pubsub_feeds/mod_pubsub_feeds.lua @ 670:9d8efb804a00

mod_pubsub_feed: Rename to mod_pubsub_feeds
author Kim Alvefur <zash@zash.se>
date Wed, 23 May 2012 21:52:14 +0200
parent 668:343b115ebbea
child 717:e79147fb39f9
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_pubsub_feeds/mod_pubsub_feeds.lua	Wed May 23 21:52:14 2012 +0200
@@ -0,0 +1,259 @@
+-- Fetches Atom feeds and publishes to PubSub nodes
+--
+-- Depends: http://code.matthewwild.co.uk/lua-feeds
+--
+-- Config:
+-- Component "pubsub.example.com" "pubsub"
+-- modules_enabled = {
+--   "pubsub_feed";
+-- }
+-- feeds = { -- node -> url
+--   prosody_blog = "http://blog.prosody.im/feed/atom.xml";
+-- }
+-- feed_pull_interval = 20 -- minutes
+--
+-- Reference
+-- http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html
+
+local modules = hosts[module.host].modules;
+if not modules.pubsub or module:get_option("component_module") ~= "pubsub" then
+	module:log("warn", "Pubsub needs to be loaded on this host");
+	--module:log("debug", "component_module is %s", tostring(module:get_option("component_module")));
+	return
+end
+
+local date, time = os.date, os.time;
+local dt_parse, dt_datetime = require "util.datetime".parse, require "util.datetime".datetime;
+local uuid = require "util.uuid".generate;
+local hmac_sha1 = require "util.hmac".sha1;
+local parse_feed = require "feeds".feed_from_string;
+local st = require "util.stanza";
+--local dump = require"util.serialization".serialize;
+
+local xmlns_atom = "http://www.w3.org/2005/Atom";
+
+local use_pubsubhubub = module:get_option_boolean("use_pubsubhubub", true);
+if use_pubsubhubub then
+	module:depends"http";
+end
+
+local http = require "net.http";
+local formdecode = http.formdecode;
+local formencode = http.formencode;
+local urldecode  = http.urldecode;
+local urlencode  = http.urlencode;
+
+local feed_list = module:shared("feed_list");
+local refresh_interval;
+
+-- Dynamically reloadable config.
+local function update_config()
+	local config = module:get_option("feeds") or {
+		planet_jabber = "http://planet.jabber.org/atom.xml";
+		prosody_blog = "http://blog.prosody.im/feed/atom.xml";
+	};
+	refresh_interval = module:get_option_number("feed_pull_interval", 15) * 60;
+	local new_feed_list = {};
+	for node, url in pairs(config) do
+		new_feed_list[node] = true;
+		if not feed_list[node] then
+			feed_list[node] = { url = url; node = node; last_update = 0 };
+		else
+			feed_list[node].url = url;
+		end
+	end
+	for node in pairs(feed_list) do
+		if not new_feed_list[node] then
+			feed_list[node] = nil;
+		end
+	end
+end
+update_config();
+module:hook("config-reloaded", update_config);
+
+local actor = module.host.."/"..module.name;
+
+function update_entry(item)
+	local node = item.node;
+	module:log("debug", "parsing %d bytes of data in node %s", #item.data or 0, node)
+	local feed = parse_feed(item.data);
+	for _, entry in ipairs(feed) do
+		entry.attr.xmlns = xmlns_atom;
+
+		local e_published = entry:get_child_text("published");
+		e_published = e_published and dt_parse(e_published);
+		local e_updated = entry:get_child_text("updated");
+		e_updated = e_updated and dt_parse(e_updated);
+
+		local timestamp = e_updated or e_published or nil;
+		--module:log("debug", "timestamp is %s, item.last_update is %s", tostring(timestamp), tostring(item.last_update));
+		if not timestamp or not item.last_update or timestamp > item.last_update then
+			local id = entry:get_child_text("id");
+			id = id or item.url.."#"..dt_datetime(timestamp); -- Missing id, so make one up
+			local xitem = st.stanza("item", { id = id }):add_child(entry);
+			-- TODO Put data from /feed into item/source
+
+			--module:log("debug", "publishing to %s, id %s", node, id);
+			local ok, err = modules.pubsub.service:publish(node, actor, id, xitem);
+			if not ok then
+				if err == "item-not-found" then -- try again
+					--module:log("debug", "got item-not-found, creating %s and trying again", node);
+					local ok, err = modules.pubsub.service:create(node, actor);
+					if not ok then
+						module:log("error", "could not create node %s: %s", node, err);
+						return;
+					end
+					local ok, err = modules.pubsub.service:publish(node, actor, id, xitem);
+					if not ok then
+						module:log("error", "could not create or publish node %s: %s", node, err);
+						return
+					end
+				else
+					module:log("error", "publishing %s failed: %s", node, err);
+				end
+			end
+		end
+	end
+	
+	if use_pubsubhubub and not item.subscription then
+		--module:log("debug", "check if %s has a hub", item.node);
+		local hub = feed.links and feed.links.hub;
+		if hub then
+			item.hub = hub;
+			module:log("debug", "%s has a hub: %s", item.node, item.hub);
+			subscribe(item);
+		end
+	end
+end
+
+function fetch(item, callback) -- HTTP Pull
+	local headers = { };
+	if item.data and item.last_update then
+		headers["If-Modified-Since"] = date("!%a, %d %b %Y %H:%M:%S %Z", item.last_update);
+	end
+	http.request(item.url, { headers = headers }, function(data, code, req) 
+		if code == 200 then
+			item.data = data;
+			if callback then callback(item) end
+			item.last_update = time();
+		elseif code == 304 then
+			item.last_update = time();
+		end
+	end);
+end
+
+function refresh_feeds()
+	local now = time();
+	--module:log("debug", "Refreshing feeds");
+	for node, item in pairs(feed_list) do
+		--FIXME Don't fetch feeds which have a subscription
+		-- Otoho, what if the subscription expires or breaks?
+		if item.last_update + refresh_interval < now then 
+			--module:log("debug", "checking %s", item.node);
+			fetch(item, update_entry);
+		end
+	end
+	return refresh_interval;
+end
+
+local function format_url(node)
+	return module:http_url(nil, "/callback") .. "?node=" .. urlencode(node);
+end	
+
+function subscribe(feed)
+	feed.token = uuid();
+	feed.secret = uuid();
+	local body = formencode{
+		["hub.callback"] = format_url(feed.node);
+		["hub.mode"] = "subscribe"; --TODO unsubscribe
+		["hub.topic"] = feed.url;
+		["hub.verify"] = "async";
+		["hub.verify_token"] = feed.token;
+		["hub.secret"] = feed.secret;
+		--["hub.lease_seconds"] = "";
+	};
+
+	--module:log("debug", "subscription request, body: %s", body);
+
+	--FIXME The subscription states and related stuff
+	feed.subscription = "subscribe";
+	http.request(feed.hub, { body = body }, function(data, code, req) 
+		module:log("debug", "subscription to %s submitted, status %s", feed.node, tostring(code));
+		if code >= 400 then
+			module:log("error", "There was something wrong with our subscription request, body: %s", tostring(data));
+			feed.subscription = "failed";
+		end
+	end);
+end
+
+function handle_http_request(event)
+	local request = event.request;
+	local method = request.method;
+	local body = request.body;
+
+	--module:log("debug", "%s request to %s%s with body %s", method, request.url.path, request.url.query and "?" .. request.url.query or "", #body > 0 and body or "empty");
+	local query = request.url.query or {}; --FIXME
+	if query and type(query) == "string" then
+		query = formdecode(query);
+		--module:log("debug", "GET data: %s", dump(query));
+	end
+	--module:log("debug", "Headers: %s", dump(request.headers));
+
+	local feed = feed_list[query.node];
+	if method == "GET" then
+		if query.node and feed then
+			if query["hub.topic"] ~= feed.url then
+				module:log("debug", "Invalid topic: %s", tostring(query["hub.topic"]))
+				return 404
+			end
+			if query["hub.mode"] ~= feed.subscription then
+				module:log("debug", "Invalid mode: %s", tostring(query["hub.mode"]))
+				return 400
+				-- Would this work for unsubscribe?
+				-- Also, if feed.subscription is changed here,
+				-- it would probably invalidate the subscription
+				-- when/if the hub asks if it should be renewed
+			end
+			if query["hub.verify_token"] ~= feed.token then
+				module:log("debug", "Invalid verify_token: %s", tostring(query["hub.verify_token"]))
+				return 401
+			end
+			module:log("debug", "Confirming %s request to %s", feed.subscription, feed.url)
+			return query["hub.challenge"];
+		end
+		return 400;
+	elseif method == "POST" then
+		local body = request.body;
+		if #body > 0 and feed then
+			module:log("debug", "got %d bytes PuSHed for %s", #body, query.node);
+			local signature = request.headers.x_hub_signature;
+			if feed.secret then
+				local localsig = "sha1=" .. hmac_sha1(feed.secret, body, true);
+				if localsig ~= signature then
+					module:log("debug", "Invalid signature, got %s but wanted %s", tostring(signature), tostring(localsig));
+					return 401;
+				end
+				module:log("debug", "Valid signature");
+			end
+			feed.data = body;
+			update_entry(feed);
+			feed.last_update = time();
+			return 202;
+		end
+		return 400;
+	end
+	return 501;
+end
+
+if use_pubsubhubub then
+	module:provides("http", {
+		default_path = "/callback";
+		route = {
+			GET = handle_http_request;
+			POST = handle_http_request;
+			-- This all?
+		};
+	});
+end
+
+module:add_timer(1, refresh_feeds);