Comparison

mod_pubsub_feed/mod_pubsub_feed.lua @ 322:637dc0a04052

mod_pubsub_feed: Implement PubSubHubbub subscriber
author Kim Alvefur <zash@zash.se>
date Mon, 31 Jan 2011 03:37:16 +0100
parent 300:b81e4f86a231
child 323:433bf7dc3e7a
comparison
equal deleted inserted replaced
321:661f64627fed 322:637dc0a04052
1 -- Fetches Atom feeds and publishes to PubSub nodes 1 -- Fetches Atom feeds and publishes to PubSub nodes
2 --
3 -- Depends: http://code.matthewwild.co.uk/lua-feeds
2 -- 4 --
3 -- Config: 5 -- Config:
4 -- Component "pubsub.example.com" "pubsub" 6 -- Component "pubsub.example.com" "pubsub"
5 -- modules_enabled = { 7 -- modules_enabled = {
6 -- "pubsub_feed"; 8 -- "pubsub_feed";
7 -- } 9 -- }
8 -- feeds = { -- node -> url 10 -- feeds = { -- node -> url
9 -- prosody_blog = "http://blog.prosody.im/feed/atom.xml"; 11 -- prosody_blog = "http://blog.prosody.im/feed/atom.xml";
10 -- } 12 -- }
11 -- feed_pull_interval = 20 -- minutes 13 -- feed_pull_interval = 20 -- minutes
14 --
15 -- Reference
16 -- http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html
12 17
13 local modules = hosts[module.host].modules; 18 local modules = hosts[module.host].modules;
14 if not modules.pubsub then 19 if not modules.pubsub then
20 --FIXME Should this throw an error() instead?
15 module:log("warn", "Pubsub needs to be loaded on this host"); 21 module:log("warn", "Pubsub needs to be loaded on this host");
16 end 22 end
23
24
25 local t_insert = table.insert;
17 local add_task = require "util.timer".add_task; 26 local add_task = require "util.timer".add_task;
18 local date, time = os.date, os.time; 27 local date, time = os.date, os.time;
19 local dt_parse, dt_datetime = require "util.datetime".parse, require "util.datetime".datetime; 28 local dt_parse, dt_datetime = require "util.datetime".parse, require "util.datetime".datetime;
20 local http = require "net.http"; 29 local http = require "net.http";
21 local parse_feed = require "feeds".feed_from_string; 30 local parse_feed = require "feeds".feed_from_string;
22 local st = require "util.stanza"; 31 local st = require "util.stanza";
32 local httpserver = require "net.httpserver";
33 local formencode = require "net.http".formencode;
34 local dump = require "util.serialization".serialize;
35
36 local urldecode = require "net.http".urldecode;
37 local urlencode = require "net.http".urlencode;
38 local urlparams = --require "net.http".getQueryParams or whatever MattJ names it, FIXME
39 function(s)
40 if not s:match("=") then return urldecode(s); end
41 local r = {}
42 s:gsub("([^=&]*)=([^&]*)", function(k,v)
43 r[ urldecode(k) ] = urldecode(v);
44 return nil
45 end)
46 return r
47 end;
23 48
24 local config = module:get_option("feeds") or { 49 local config = module:get_option("feeds") or {
25 planet_jabber = "http://planet.jabber.org/atom.xml"; 50 planet_jabber = "http://planet.jabber.org/atom.xml";
26 prosody_blog = "http://blog.prosody.im/feed/atom.xml"; 51 prosody_blog = "http://blog.prosody.im/feed/atom.xml";
27 }; 52 };
28 local refresh_interval = (module:get_option("feed_pull_interval") or 15) * 60; 53 local refresh_interval = module:get_option_number("feed_pull_interval", 15) * 60;
54 local use_pubsubhubub = module:get_option_boolean("use_pubsubhubub", true); -- HTTP by default or not?
29 local feed_list = { } 55 local feed_list = { }
30 for node, url in pairs(config) do 56 for node, url in pairs(config) do
31 feed_list[node] = { url = url }; 57 feed_list[node] = { url = url; node = node; last_update = 0 };
32 end 58 end
33 59
34 local function update(item, callback) 60 local response_codes = {
61 ["202"] = "Accepted";
62 ["400"] = "Bad Request";
63 ["501"] = "Not Implemented";
64 };
65
66 local function http_response(code, headers, body)
67 return {
68 status = (type(code) == "number" and code .. " " .. response_codes[tostring(code)]) or code;
69 headers = headers or {};
70 body = body or "<h1>" .. response_codes[tostring(code)] .. "</h1>\n";
71 };
72 end
73
74 local actor = module.host.."/"..module.name;
75
76 function update_entry(item)
77 local node = item.node;
78 --module:log("debug", "parsing %d bytes of data in node %s", #item.data or 0, node)
79 local feed = parse_feed(item.data);
80 module:log("debug", "updating node %s", node);
81 for _, entry in ipairs(feed) do
82 entry.attr.xmlns = "http://www.w3.org/2005/Atom";
83
84 local e_published = entry:get_child("published");
85 e_published = e_published and e_published:get_text();
86 e_published = e_published and dt_parse(e_published);
87 local e_updated = entry:get_child("updated");
88 e_updated = e_updated and e_updated:get_text();
89 e_updated = e_updated and dt_parse(e_updated);
90
91 local timestamp = e_updated or e_published or nil;
92 --module:log("debug", "timestamp is %s, item.last_update is %s", tostring(timestamp), tostring(item.last_update));
93 if not timestamp or not item.last_update or timestamp > item.last_update then
94 local id = entry:get_child("id");
95 id = id and id:get_text() or item.url.."#"..dt_datetime(timestamp); -- Missing id, so make one up
96 local xitem = st.stanza("item", { id = id }):add_child(entry);
97 -- TODO Put data from /feed into item/source
98
99 module:log("debug", "publishing to %s, id %s", node, id);
100 local ok, err = modules.pubsub.service:publish(node, actor, id, xitem);
101 if not ok then
102 if err == "item-not-found" then -- try again
103 module:log("debug", "got item-not-found, creating %s and trying again", node);
104 local ok, err = modules.pubsub.service:create(node, actor);
105 if not ok then
106 module:log("error", "could not create node %s: %s", node, err);
107 return;
108 end
109 local ok, err = modules.pubsub.service:publish(node, actor, id, xitem);
110 if not ok then
111 module:log("error", "could not create or publish node %s: %s", node, err);
112 return
113 end
114 else
115 module:log("error", "publishing %s failed: %s", node, err);
116 end
117 end
118 end
119 end
120
121 if use_pubsubhubub and not item.subscription then
122 module:log("debug", "check if %s has a hub", item.node);
123 local hub = feed.links and feed.links.hub;
124 if hub then
125 item.hub = hub;
126 module:log("debug", "%s has a hub: %s", item.node, item.hub);
127 subscribe(item);
128 end
129 end
130 end
131
132 function fetch(item, callback) -- HTTP Pull
35 local headers = { }; 133 local headers = { };
36 if item.data and item.last_update then 134 if item.data and item.last_update then
37 headers["If-Modified-Since"] = date("!%a, %d %b %Y %T %Z", item.last_update); 135 headers["If-Modified-Since"] = date("!%a, %d %b %Y %T %Z", item.last_update);
38 end 136 end
39 http.request(item.url, {headers = headers}, function(data, code, req) 137 http.request(item.url, { headers = headers }, function(data, code, req)
40 if code == 200 then 138 if code == 200 then
41 item.data = data; 139 item.data = data;
42 callback(item) 140 if callback then callback(item) end
43 item.last_update = time(); 141 item.last_update = time();
44 end 142 end
45 if code == 304 then 143 if code == 304 then
46 item.last_update = time(); 144 item.last_update = time();
47 end 145 end
48 end); 146 end);
49 end 147 end
50 148
51 local actor = module.host.."/"..module.name; 149 function refresh_feeds()
52 150 --module:log("debug", "Refreshing feeds");
53 local function refresh_feeds()
54 for node, item in pairs(feed_list) do 151 for node, item in pairs(feed_list) do
55 update(item, function(item) 152 --FIXME Don't fetch feeds which have a subscription
56 local feed = parse_feed(item.data); 153 -- Otoho, what if the subscription expires or breaks?
57 module:log("debug", "node: %s", node); 154 if item.last_update + refresh_interval < time() then
58 for _, entry in ipairs(feed) do 155 module:log("debug", "checking %s", item.node);
59 entry.attr.xmlns = "http://www.w3.org/2005/Atom"; 156 fetch(item, update_entry);
60 157 end
61 local e_published = entry:get_child("published"); 158 end
62 e_published = e_published and e_published[1]; 159 return refresh_interval;
63 e_published = e_published and dt_parse(e_published); 160 end
64 local e_updated = entry:get_child("updated"); 161
65 e_updated = e_updated and e_updated[1]; 162 function subscribe(feed, challenge)
66 e_updated = e_updated and dt_parse(e_updated); 163 local _body, body = {
67 164 ["hub.callback"] = "http://"..module.host..":5280/callback?node=" .. urlencode(feed.node); --FIXME figure out your own hostname reliably?
68 local timestamp = e_updated or e_published or nil; 165 ["hub.mode"] = "subscribe"; --TODO unsubscribe
69 module:log("debug", "timestamp is %s, item.last_update is %s", tostring(timestamp), tostring(item.last_update)); 166 ["hub.topic"] = feed.url;
70 if not timestamp or not item.last_update or timestamp > item.last_update then 167 ["hub.verify"] = "async";
71 local id = entry:get_child("id"); 168 ["hub.verify_token"] = challenge;
72 id = id[1] or item.url.."#"..dt_datetime(timestamp); -- Missing id, so make one up 169 --["hub.lease_seconds"] = "";
73 local item = st.stanza("item", { id = id }):add_child(entry); 170 }, { };
74 171 for name, value in pairs(_body) do
75 module:log("debug", "publishing to %s, id %s", node, id); 172 t_insert(body, { name = name, value = value });
76 local ok, err = modules.pubsub.service:publish(node, actor, id, item); 173 end --FIXME Why do I have to do this?
77 if not ok then 174 body = formencode(body);
78 if err == "item-not-found" then -- try again 175
79 module:log("debug", "got item-not-found, creating %s and trying again", node); 176 --module:log("debug", "subscription request, body: %s", body);
80 local ok, err = modules.pubsub.service:create(node, actor); 177
81 if not ok then 178 --FIXME The subscription states and related stuff
82 module:log("error", "could not create node: %s", err); 179 --feed.subscription = challenge and "asked" or "asking";
83 return; 180 feed.subscription = "asking";
84 end 181 http.request(feed.hub, { body = body }, function(data, code, req)
85 local ok, err = modules.pubsub.service:publish(node, actor, id, item); 182 local code = tostring(code);
86 if not ok then 183 module:log("debug", "subscription to %s submitted, staus %s", feed.node, code);
87 module:log("error", "still could not create node: %s", err); 184 if code == '202' then
88 return 185 if challenge then
89 end 186 module:log("debug", "subscribe to %s confirmed", feed.node);
90 else 187 feed.subscription = "active";
91 module:log("error", "publish failed: %s", err); 188 else
92 end 189 module:log("debug", "subscription to %s submitted", feed.node);
93 end 190 --feed.subscription = "incomplete";
94 end
95 end 191 end
96 end); 192 end
97 end 193 end);
98 return refresh_interval; 194 end
195
196 function handle_http_request(method, body, request)
197 --module:log("debug", "%s request to %s%s with body %s", method, request.url.path, request.url.query and "?" .. request.url.query or "", #body > 0 and body or "empty");
198 local query = request.url.query;
199 if query and type(query) == "string" then
200 query = urlparams(query);
201 --module:log("debug", "GET data: %s", dump(query));
202 end
203
204 -- TODO http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html#authednotify
205
206 if method == "GET" then
207 if query.node and feed_list[query.node] then
208 local feed = feed_list[query.node];
209 local challenge = query["hub.challenge"];
210 if challenge and feed.subscription == "asking" then
211 module:log("debug", "got a challenge for %s: %s", feed.node, challenge);
212 subscribe(feed, challenge);
213 return http_response(202);
214 end
215 end
216 return http_response(400);
217 elseif method == "POST" then
218 if #body > 0 and feed_list[query.node] then
219 module:log("debug", "got %d bytes PuSHed for %s", #body, query.node);
220 local feed = feed_list[query.node];
221 feed.data = body;
222 update_entry(feed);
223 feed.last_update = time();
224 return http_response(202);
225 end
226 return http_response(400);
227 end
228 return http_response(501);
99 end 229 end
100 230
101 function init() 231 function init()
232 module:log("debug", "initiating", module.name);
233 if use_pubsubhubub then
234 httpserver.new{ port = 5280, base = "callback", handler = handle_http_request }
235 end
102 add_task(0, refresh_feeds); 236 add_task(0, refresh_feeds);
103 end 237 end
104 238
105 if prosody.start_time then -- already started 239 if prosody.start_time then -- already started
106 init(); 240 init();
107 else 241 else
108 prosody.events.add_handler("server-started", init); 242 prosody.events.add_handler("server-started", init);
109 end 243 end
110