Software / code / prosody-modules
Comparison
mod_pubsub_hub/mod_pubsub_hub.lua @ 764:d11d91ee81ed
mod_pubsub_hub: New module that implements the Hub part of PubSubHubbub
| author | Kim Alvefur <zash@zash.se> |
|---|---|
| date | Wed, 01 Aug 2012 16:11:12 +0200 |
| child | 766:1184fe8ebb21 |
comparison
equal
deleted
inserted
replaced
| 763:bcf0c9fff512 | 764:d11d91ee81ed |
|---|---|
| 1 -- Copyright (C) 2011 - 2012 Kim Alvefur | |
| 2 -- | |
| 3 -- This file is MIT/X11 licensed. | |
| 4 | |
| 5 local http = require "net.http"; | |
| 6 local formdecode = http.formdecode; | |
| 7 local formencode = http.formencode; | |
| 8 local uuid = require "util.uuid".generate; | |
| 9 local hmac_sha1 = require "util.hmac".sha1; | |
| 10 local json_encode = require "util.json".encode; | |
| 11 local time = os.time; | |
| 12 local m_min, m_max = math.min, math.max; | |
| 13 local tostring = tostring; | |
| 14 local xmlns_pubsub = "http://jabber.org/protocol/pubsub"; | |
| 15 local xmlns_pubsub_event = xmlns_pubsub .. "#event"; | |
| 16 local subs_by_topic = module:shared"subscriptions"; | |
| 17 | |
| 18 local max_lease, min_lease, default_lease = 86400, 600, 3600; | |
| 19 | |
| 20 module:depends"pubsub"; | |
| 21 | |
| 22 local valid_modes = { ["subscribe"] = true, ["unsubscribe"] = true, } | |
| 23 | |
| 24 local function do_subscribe(subscription) | |
| 25 -- FIXME handle other states | |
| 26 if subscription.state == "subscribed" then | |
| 27 local ok, err = hosts[module.host].modules.pubsub.service:add_subscription(subscription.topic, true, module.host); | |
| 28 module:log(ok and "debug" or "error", "add_subscription() => %s, %s", tostring(ok), tostring(err)); | |
| 29 end | |
| 30 end | |
| 31 | |
| 32 local function handle_request(event) | |
| 33 local request, response = event.request, event.response; | |
| 34 local method, body = request.method, request.body; | |
| 35 | |
| 36 local query = request.url.query or {}; | |
| 37 if query and type(query) == "string" then | |
| 38 query = formdecode(query); | |
| 39 end | |
| 40 if body and request.headers.content_type == "application/x-www-form-urlencoded" then | |
| 41 body = formdecode(body); | |
| 42 end | |
| 43 | |
| 44 if method == "POST" then | |
| 45 -- Subscription request | |
| 46 if body["hub.callback"] and body["hub.mode"] and valid_modes[body["hub.mode"]] | |
| 47 and body["hub.topic"] and body["hub.verify"] then | |
| 48 | |
| 49 -- http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html#anchor5 | |
| 50 local callback = body["hub.callback"]; | |
| 51 local mode = body["hub.mode"]; | |
| 52 local topic = body["hub.topic"]; | |
| 53 local lease_seconds = m_min(tonumber(body["hub.lease_seconds"]) or default_lease, max_lease); | |
| 54 local secret = body["hub.secret"]; | |
| 55 local verify_token = body["hub.verify_token"]; | |
| 56 | |
| 57 module:log("debug", "topic is "..(type(topic)=="string" and "%q" or "%s"), topic); | |
| 58 | |
| 59 if not subs_by_topic[topic] then | |
| 60 subs_by_topic[topic] = {}; | |
| 61 end | |
| 62 local subscription = subs_by_topic[topic][callback]; | |
| 63 | |
| 64 local verify_modes = {}; | |
| 65 for i=1,#body do | |
| 66 if body[i].name == "hub.verify" then | |
| 67 verify_modes[body[i].value] = true; | |
| 68 end | |
| 69 end | |
| 70 | |
| 71 subscription = subscription or { | |
| 72 id = uuid(), | |
| 73 callback = callback, | |
| 74 topic = topic, | |
| 75 state = "unsubscribed", | |
| 76 secret = secret, | |
| 77 want_state = mode, | |
| 78 lease_seconds = lease_seconds, | |
| 79 expires = time() + lease_seconds, | |
| 80 }; | |
| 81 subs_by_topic[topic][callback] = subscription; | |
| 82 local challenge = uuid(); | |
| 83 | |
| 84 local callback_url = callback .. (callback:match("%?") and "&" or "?") .. formencode{ | |
| 85 ["hub.mode"] = mode, | |
| 86 ["hub.topic"] = topic, | |
| 87 ["hub.challenge"] = challenge, | |
| 88 ["hub.lease_seconds"] = tostring(lease_seconds), | |
| 89 ["hub.verify_token"] = verify_token, | |
| 90 } | |
| 91 module:log("debug", require"util.serialization".serialize(verify_modes)); | |
| 92 if verify_modes["async"] then | |
| 93 module:log("debug", "Sending async verification request to %s for %s", tostring(callback_url), tostring(subscription)); | |
| 94 http.request(callback_url, nil, function(body, code) | |
| 95 if body == challenge and code > 199 and code < 300 then | |
| 96 if not subscription.want_state then | |
| 97 module:log("warn", "Verification of already verified request, probably"); | |
| 98 return; | |
| 99 end | |
| 100 subscription.state = subscription.want_state .. "d"; | |
| 101 subscription.want_state = nil; | |
| 102 module:log("debug", "calling do_subscribe()"); | |
| 103 do_subscribe(subscription); | |
| 104 subs_by_topic[topic][callback] = subscription; | |
| 105 else | |
| 106 module:log("warn", "status %d and body was %q", tostring(code), tostring(body)); | |
| 107 subs_by_topic[topic][callback] = subscription; | |
| 108 end | |
| 109 end) | |
| 110 return 202; | |
| 111 elseif verify_modes["sync"] then | |
| 112 http.request(callback_url, nil, function(body, code) | |
| 113 if body == challenge and code > 199 and code < 300 then | |
| 114 if not subscription.want_state then | |
| 115 module:log("warn", "Verification of already verified request, probably"); | |
| 116 return; | |
| 117 end | |
| 118 if mode == "unsubscribe" then | |
| 119 subs_by_topic[topic][callback] = nil; | |
| 120 else | |
| 121 subscription.state = subscription.want_state .. "d"; | |
| 122 subscription.want_state = nil; | |
| 123 module:log("debug", "calling do_subscribe()"); | |
| 124 do_subscribe(subscription); | |
| 125 subs_by_topic[topic][callback] = subscription; | |
| 126 end | |
| 127 else | |
| 128 subs_by_topic[topic][callback] = subscription; | |
| 129 end | |
| 130 response.status = 204; | |
| 131 response:send(); | |
| 132 end) | |
| 133 return true; | |
| 134 end | |
| 135 return 400; | |
| 136 else | |
| 137 response.status = 400; | |
| 138 response.headers.content_type = "text/html"; | |
| 139 return "<h1>Bad Request</h1>\n<a href='http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html#anchor5'>Missing required parameter(s)</a>\n" | |
| 140 end | |
| 141 end | |
| 142 end | |
| 143 | |
| 144 local function periodic() | |
| 145 local now = time(); | |
| 146 local next_check = now + max_lease; | |
| 147 local purge = false | |
| 148 for topic, callbacks in pairs(subs_by_topic) do | |
| 149 for callback, subscription in pairs(callbacks) do | |
| 150 if subscription.mode == "subscribed" then | |
| 151 if subscription.expires < now then | |
| 152 -- Subscription has expired, drop it. | |
| 153 purge = true | |
| 154 end | |
| 155 if subscription.expires < now + min_lease then | |
| 156 -- Subscription set to expire soon, re-confirm it. | |
| 157 local challenge = uuid(); | |
| 158 local callback_url = callback .. (callback:match("%?") and "&" or "?") .. formencode{ | |
| 159 ["hub.mode"] = subscription.state, | |
| 160 ["hub.topic"] = topic, | |
| 161 ["hub.challenge"] = challenge, | |
| 162 ["hub.lease_seconds"] = subscription.lease_seconds, | |
| 163 ["hub.verify_token"] = subscription.verify_token, | |
| 164 } | |
| 165 http.request(callback_url, nil, function(body, code) | |
| 166 if body == challenge and code > 199 and code < 300 then | |
| 167 subscription.expires = now + subscription.lease_seconds; | |
| 168 end | |
| 169 end); | |
| 170 else | |
| 171 next_check = m_min(next_check, now - subscription.expires) | |
| 172 end | |
| 173 end | |
| 174 end | |
| 175 if purge then | |
| 176 local new_callbacks = {}; | |
| 177 for callback, subscription in pairs(callbacks) do | |
| 178 if (subscription.state == "subscribed" and subscription.expires < now) | |
| 179 and subscription.want_state ~= "remove" then | |
| 180 new_callbacks[callback] = subscription; | |
| 181 end | |
| 182 end | |
| 183 subs_by_topic[topic] = new_callbacks | |
| 184 end | |
| 185 end | |
| 186 return m_max(next_check - min_lease, min_lease); | |
| 187 end | |
| 188 | |
| 189 local function on_notify(subscription, content) | |
| 190 local body = tostring(content); | |
| 191 local headers = { | |
| 192 ["Content-Type"] = "application/xml", | |
| 193 }; | |
| 194 if subscription.secret then | |
| 195 headers["X-Hub-Signature"] = "sha1="..hmac_sha1(subscription.secret, body, true); | |
| 196 end | |
| 197 http.request(subscription.callback, { method = "POST", body = body, headers = headers }, function(body, code) | |
| 198 if code >= 200 and code <= 299 then | |
| 199 module:log("debug", "Delivered"); | |
| 200 else | |
| 201 module:log("warn", "Got status code %d on delivery to %s", tonumber(code) or -1, tostring(subscription.callback)); | |
| 202 -- TODO Retry | |
| 203 -- ... but the spec says that you should not retry, wtf? | |
| 204 end | |
| 205 end); | |
| 206 end | |
| 207 | |
| 208 module:hook("message/host", function(event) | |
| 209 local stanza = event.stanza; | |
| 210 if stanza.attr.from ~= module.host then return end; | |
| 211 | |
| 212 for pubsub_event in stanza:childtags("event", xmlns_pubsub_event) do | |
| 213 local items = pubsub_event:get_child("items"); | |
| 214 local node = items.attr.node; | |
| 215 if items and node and subs_by_topic[node] then | |
| 216 for item in items:childtags("item") do | |
| 217 local content = item.tags[1]; | |
| 218 for callback, subscription in pairs(subs_by_topic[node]) do | |
| 219 on_notify(subscription, content) | |
| 220 end | |
| 221 end | |
| 222 end | |
| 223 end | |
| 224 return true; | |
| 225 end, 10); | |
| 226 | |
| 227 module:depends"http"; | |
| 228 module:provides("http", { | |
| 229 default_path = "/hub"; | |
| 230 route = { | |
| 231 POST = handle_request; | |
| 232 GET = function() | |
| 233 return json_encode(subs_by_topic); | |
| 234 end; | |
| 235 ["GET /topic/*"] = function(event, path) | |
| 236 return json_encode(subs_by_topic[path]) | |
| 237 end; | |
| 238 }; | |
| 239 }); | |
| 240 | |
| 241 module:add_timer(1, periodic); |