Software /
code /
prosody-modules
Comparison
mod_pubsub_hub/mod_pubsub_hub.lua @ 1458:e9d164e694e7
mod_pubsub_hub: Update to PubSubHubbub version 0.4
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Thu, 26 Jun 2014 19:41:32 +0200 |
parent | 1325:b21236b6b8d8 |
child | 3021:4cec8b7aed6d |
comparison
equal
deleted
inserted
replaced
1457:720ff25d94e6 | 1458:e9d164e694e7 |
---|---|
3 -- This file is MIT/X11 licensed. | 3 -- This file is MIT/X11 licensed. |
4 | 4 |
5 local http = require "net.http"; | 5 local http = require "net.http"; |
6 local formdecode = http.formdecode; | 6 local formdecode = http.formdecode; |
7 local formencode = http.formencode; | 7 local formencode = http.formencode; |
8 local http_request = http.request; | |
8 local uuid = require "util.uuid".generate; | 9 local uuid = require "util.uuid".generate; |
9 local hmac_sha1 = require "util.hmac".sha1; | 10 local hmac_sha1 = require "util.hmac".sha1; |
10 local json_encode = require "util.json".encode; | 11 local json_encode = require "util.json".encode; |
11 local time = os.time; | 12 local time = os.time; |
12 local m_min, m_max = math.min, math.max; | 13 local m_min, m_max = math.min, math.max; |
13 local tostring = tostring; | 14 local tostring = tostring; |
15 | |
14 local xmlns_pubsub = "http://jabber.org/protocol/pubsub"; | 16 local xmlns_pubsub = "http://jabber.org/protocol/pubsub"; |
15 local xmlns_pubsub_event = xmlns_pubsub .. "#event"; | 17 local xmlns_pubsub_event = xmlns_pubsub .. "#event"; |
16 local subs_by_topic = module:shared"subscriptions"; | 18 local subs_by_topic = module:shared"subscriptions"; |
17 | 19 |
18 local max_lease, min_lease, default_lease = 86400, 600, 3600; | 20 local max_lease, min_lease, default_lease = 86400, 600, 3600; |
52 local topic = body["hub.topic"]; | 54 local topic = body["hub.topic"]; |
53 local lease_seconds = m_max(min_lease, m_min(tonumber(body["hub.lease_seconds"]) or default_lease, max_lease)); | 55 local lease_seconds = m_max(min_lease, m_min(tonumber(body["hub.lease_seconds"]) or default_lease, max_lease)); |
54 local secret = body["hub.secret"]; | 56 local secret = body["hub.secret"]; |
55 local verify_token = body["hub.verify_token"]; | 57 local verify_token = body["hub.verify_token"]; |
56 | 58 |
57 module:log("debug", "topic is "..(type(topic)=="string" and "%q" or "%s"), topic); | 59 module:log("debug", "topic is "..(type(topic)=="string" and "%q" or "%s"), tostring(topic)); |
58 | 60 |
59 if not subs_by_topic[topic] then | 61 if not subs_by_topic[topic] then |
60 subs_by_topic[topic] = {}; | 62 subs_by_topic[topic] = {}; |
61 end | 63 end |
62 local subscription = subs_by_topic[topic][callback]; | 64 local subscription = subs_by_topic[topic][callback]; |
73 callback = callback, | 75 callback = callback, |
74 topic = topic, | 76 topic = topic, |
75 state = "unsubscribed", | 77 state = "unsubscribed", |
76 secret = secret, | 78 secret = secret, |
77 want_state = mode, | 79 want_state = mode, |
78 lease_seconds = lease_seconds, | |
79 expires = time() + lease_seconds, | |
80 }; | 80 }; |
81 subscription.lease_seconds = lease_seconds; | |
82 subscription.expires = time() + lease_seconds; | |
81 subs_by_topic[topic][callback] = subscription; | 83 subs_by_topic[topic][callback] = subscription; |
82 local challenge = uuid(); | 84 local challenge = uuid(); |
83 | 85 |
84 local callback_url = callback .. (callback:match("%?") and "&" or "?") .. formencode{ | 86 local callback_url = callback .. (callback:match("%?") and "&" or "?") .. formencode{ |
85 ["hub.mode"] = mode, | 87 ["hub.mode"] = mode, |
86 ["hub.topic"] = topic, | 88 ["hub.topic"] = topic, |
87 ["hub.challenge"] = challenge, | 89 ["hub.challenge"] = challenge, |
88 ["hub.lease_seconds"] = tostring(lease_seconds), | 90 ["hub.lease_seconds"] = tostring(lease_seconds), |
89 ["hub.verify_token"] = verify_token, | 91 ["hub.verify_token"] = verify_token, -- COMPAT draft version 0.3 |
90 } | 92 } |
91 module:log("debug", require"util.serialization".serialize(verify_modes)); | 93 module:log("debug", "Sending async verification request to %s for %s", tostring(callback_url), tostring(subscription)); |
92 if verify_modes["async"] then | 94 http_request(callback_url, nil, function(body, code) |
93 module:log("debug", "Sending async verification request to %s for %s", tostring(callback_url), tostring(subscription)); | 95 if body == challenge and code > 199 and code < 300 then |
94 http.request(callback_url, nil, function(body, code) | 96 if not subscription.want_state then |
95 if body == challenge and code > 199 and code < 300 then | 97 module:log("warn", "Verification of already verified request, probably"); |
96 if not subscription.want_state then | 98 return; |
97 module:log("warn", "Verification of already verified request, probably"); | |
98 return; | |
99 end | |
100 subscription.state = subscription.want_state .. "d"; | |
101 subscription.want_state = nil; | |
102 module:log("debug", "calling do_subscribe()"); | |
103 do_subscribe(subscription); | |
104 subs_by_topic[topic][callback] = subscription; | |
105 else | |
106 module:log("warn", "status %d and body was %q", tostring(code), tostring(body)); | |
107 subs_by_topic[topic][callback] = subscription; | |
108 end | 99 end |
109 end) | 100 subscription.state = subscription.want_state .. "d"; |
110 return 202; | 101 subscription.want_state = nil; |
111 elseif verify_modes["sync"] then | 102 module:log("debug", "calling do_subscribe()"); |
112 http.request(callback_url, nil, function(body, code) | 103 do_subscribe(subscription); |
113 if body == challenge and code > 199 and code < 300 then | 104 subs_by_topic[topic][callback] = subscription; |
114 if not subscription.want_state then | 105 else |
115 module:log("warn", "Verification of already verified request, probably"); | 106 module:log("warn", "status %d and body was %q", tostring(code), tostring(body)); |
116 return; | 107 subs_by_topic[topic][callback] = subscription; |
117 end | 108 end |
118 if mode == "unsubscribe" then | 109 end) |
119 subs_by_topic[topic][callback] = nil; | 110 return 202; |
120 else | |
121 subscription.state = subscription.want_state .. "d"; | |
122 subscription.want_state = nil; | |
123 module:log("debug", "calling do_subscribe()"); | |
124 do_subscribe(subscription); | |
125 subs_by_topic[topic][callback] = subscription; | |
126 end | |
127 else | |
128 subs_by_topic[topic][callback] = subscription; | |
129 end | |
130 response.status = 204; | |
131 response:send(); | |
132 end) | |
133 return true; | |
134 end | |
135 return 400; | |
136 else | 111 else |
137 response.status = 400; | 112 response.status = 400; |
138 response.headers.content_type = "text/html"; | 113 response.headers.content_type = "text/html"; |
139 return "<h1>Bad Request</h1>\n<a href='http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html#anchor5'>Missing required parameter(s)</a>\n" | 114 return "<h1>Bad Request</h1>\n<a href='http://pubsubhubbub.googlecode.com/svn/trunk/pubsubhubbub-core-0.3.html#anchor5'>Missing required parameter(s)</a>\n" |
140 end | 115 end |
141 end | 116 end |
142 end | 117 end |
143 | 118 |
144 local function periodic() | 119 local function periodic(now) |
145 local now = time(); | |
146 local next_check = now + max_lease; | 120 local next_check = now + max_lease; |
147 local purge = false; | 121 local purge = false; |
148 for topic, callbacks in pairs(subs_by_topic) do | 122 for topic, callbacks in pairs(subs_by_topic) do |
149 for callback, subscription in pairs(callbacks) do | 123 for callback, subscription in pairs(callbacks) do |
150 if subscription.mode == "subscribed" then | 124 if subscription.mode == "subscribed" then |
151 if subscription.expires < now then | 125 if subscription.expires < now then |
152 -- Subscription has expired, drop it. | 126 -- Subscription has expired, drop it. |
153 purge = true; | 127 purge = true; |
154 elseif subscription.expires < now + min_lease then | |
155 -- Subscription set to expire soon, re-confirm it. | |
156 local challenge = uuid(); | |
157 local callback_url = callback .. (callback:match("%?") and "&" or "?") .. formencode{ | |
158 ["hub.mode"] = subscription.state, | |
159 ["hub.topic"] = topic, | |
160 ["hub.challenge"] = challenge, | |
161 ["hub.lease_seconds"] = subscription.lease_seconds, | |
162 ["hub.verify_token"] = subscription.verify_token, | |
163 } | |
164 http.request(callback_url, nil, function(body, code) | |
165 if body == challenge and code > 199 and code < 300 then | |
166 subscription.expires = now + subscription.lease_seconds; | |
167 end | |
168 end); | |
169 else | 128 else |
170 next_check = m_min(next_check, subscription.expires); | 129 next_check = m_min(next_check, subscription.expires); |
171 end | 130 end |
172 end | 131 end |
173 end | 132 end |
199 ["Content-Type"] = "application/xml", | 158 ["Content-Type"] = "application/xml", |
200 }; | 159 }; |
201 if subscription.secret then | 160 if subscription.secret then |
202 headers["X-Hub-Signature"] = "sha1="..hmac_sha1(subscription.secret, body, true); | 161 headers["X-Hub-Signature"] = "sha1="..hmac_sha1(subscription.secret, body, true); |
203 end | 162 end |
204 http.request(subscription.callback, { method = "POST", body = body, headers = headers }, function(body, code) | 163 http_request(subscription.callback, { method = "POST", body = body, headers = headers }, function(body, code) |
205 if code >= 200 and code <= 299 then | 164 if code >= 200 and code <= 299 then |
206 module:log("debug", "Delivered"); | 165 module:log("debug", "Delivered"); |
207 else | 166 else |
208 module:log("warn", "Got status code %d on delivery to %s", tonumber(code) or -1, tostring(subscription.callback)); | 167 module:log("warn", "Got status code %d on delivery to %s", tonumber(code) or -1, tostring(subscription.callback)); |
209 -- TODO Retry | 168 -- TODO Retry |