Software /
code /
prosody-modules
Comparison
mod_storage_s3/mod_storage_s3.lua @ 5696:b17ba149b7c5
mod_storage_s3: Implement Archive storage
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Sat, 14 Oct 2023 21:44:14 +0200 |
parent | 5695:c74a96dc5d58 |
child | 5697:51d0311747fa |
comparison
equal
deleted
inserted
replaced
5695:c74a96dc5d58 | 5696:b17ba149b7c5 |
---|---|
1 local http = require "prosody.net.http"; | 1 local http = require "prosody.net.http"; |
2 local array = require "prosody.util.array"; | 2 local array = require "prosody.util.array"; |
3 local async = require "prosody.util.async"; | 3 local async = require "prosody.util.async"; |
4 local dt = require "prosody.util.datetime"; | |
4 local hashes = require "prosody.util.hashes"; | 5 local hashes = require "prosody.util.hashes"; |
5 local httputil = require "prosody.util.http"; | 6 local httputil = require "prosody.util.http"; |
7 local uuid = require "prosody.util.uuid"; | |
6 local it = require "prosody.util.iterators"; | 8 local it = require "prosody.util.iterators"; |
7 local jid = require "prosody.util.jid"; | 9 local jid = require "prosody.util.jid"; |
8 local json = require "prosody.util.json"; | 10 local json = require "prosody.util.json"; |
9 local st = require "prosody.util.stanza"; | 11 local st = require "prosody.util.stanza"; |
10 local xml = require "prosody.util.xml"; | 12 local xml = require "prosody.util.xml"; |
11 local url = require "socket.url"; | 13 local url = require "socket.url"; |
12 | 14 |
15 local new_uuid = uuid.v7 or uuid.generate; | |
13 local hmac_sha256 = hashes.hmac_sha256; | 16 local hmac_sha256 = hashes.hmac_sha256; |
14 local sha256 = hashes.sha256; | 17 local sha256 = hashes.sha256; |
15 | 18 |
16 local driver = {}; | 19 local driver = {}; |
17 | 20 |
112 return http.request(url.build(request), { method = method; headers = headers; body = payload }); | 115 return http.request(url.build(request), { method = method; headers = headers; body = payload }); |
113 end | 116 end |
114 | 117 |
115 -- coerce result back into Prosody data type | 118 -- coerce result back into Prosody data type |
116 local function on_result(response) | 119 local function on_result(response) |
120 if response.code >= 400 then | |
121 error(response.body); | |
122 end | |
117 local content_type = response.headers["content-type"]; | 123 local content_type = response.headers["content-type"]; |
118 if content_type == "application/json" then | 124 if content_type == "application/json" then |
119 return json.decode(response.body); | 125 return json.decode(response.body); |
120 elseif content_type == "application/xml" then | 126 elseif content_type == "application/xml" then |
121 return xml.parse(response.body); | 127 return xml.parse(response.body); |
170 return function() | 176 return function() |
171 return keys:pop(); | 177 return keys:pop(); |
172 end | 178 end |
173 end | 179 end |
174 | 180 |
181 local archive = {}; | |
182 driver.archive = { __index = archive }; | |
183 | |
184 archive.caps = { | |
185 }; | |
186 | |
187 function archive:_path(username, date, when, with, key) | |
188 return url.build_path({ | |
189 is_absolute = true; | |
190 bucket; | |
191 jid.escape(module.host); | |
192 jid.escape(self.store); | |
193 jid.escape(username); | |
194 jid.escape(jid.prep(with)); | |
195 date or dt.date(when); | |
196 key; | |
197 }) | |
198 end | |
199 | |
200 | |
201 -- PUT .../with/when/id | |
202 function archive:append(username, key, value, when, with) | |
203 local wrapper = st.stanza("wrapper"); | |
204 -- Minio had trouble with timestamps, probably the ':' characters, in paths. | |
205 wrapper:tag("delay", { xmlns = "urn:xmpp:delay"; stamp = dt.datetime(when) }):up(); | |
206 wrapper:add_direct_child(value); | |
207 key = key or new_uuid(); | |
208 return async.wait_for(new_request("PUT", self:_path(username, nil, when, with, key), nil, wrapper):next(function(r) | |
209 if r.code == 200 then | |
210 return key; | |
211 else | |
212 error(r.body); | |
213 end | |
214 end)); | |
215 end | |
216 | |
217 function archive:find(username, query) | |
218 local bucket_path = url.build_path({ is_absolute = true; bucket; is_directory = true }); | |
219 local prefix = { jid.escape(module.host); jid.escape(self.store); is_directory = true }; | |
220 table.insert(prefix, jid.escape(username or "@")); | |
221 if query["with"] then | |
222 table.insert(prefix, sha256(jid.prep(query["with"]), true):sub(1,24)); | |
223 if query["start"] and query["end"] and dt.date(query["start"]) == dt.date(query["end"]) then | |
224 table.insert(prefix, sha256(jid.prep(query["with"]), true):sub(1,24)); | |
225 end | |
226 end | |
227 | |
228 prefix = url.build_path(prefix); | |
229 local list_result, err = async.wait_for(new_request("GET", bucket_path, { | |
230 prefix = prefix; | |
231 ["max-keys"] = query["max"] and tostring(query["max"]); | |
232 })); | |
233 if err or list_result.code ~= 200 then | |
234 return nil, err; | |
235 end | |
236 local list_bucket_result = xml.parse(list_result.body); | |
237 if list_bucket_result:get_child_text("IsTruncated") == "true" then | |
238 local max_keys = list_bucket_result:get_child_text("MaxKeys"); | |
239 module:log("warn", "Paging truncated results not implemented, max %s %s returned", max_keys, self.store); | |
240 end | |
241 local keys = array(); | |
242 local iterwrap = function(...) | |
243 return ...; | |
244 end | |
245 if query["reverse"] then | |
246 query["before"], query["after"] = query["after"], query["before"]; | |
247 iterwrap = it.reverse; | |
248 end | |
249 local found = not query["after"]; | |
250 for content in iterwrap(list_bucket_result:childtags("Contents")) do | |
251 local key = url.parse_path(content:get_child_text("Key")); | |
252 if found and query["before"] == key[6] then | |
253 break | |
254 end | |
255 if (not query["with"] or query["with"] == jid.unescape(key[5])) | |
256 and (not query["start"] or dt.date(query["start"]) >= key[6]) | |
257 and (not query["end"] or dt.date(query["end"]) <= key[6]) | |
258 and found then | |
259 keys:push({ key = key[6]; date = key[5]; with = jid.unescape(key[4]) }); | |
260 end | |
261 if not found and key[6] == query["after"] then | |
262 found = not found | |
263 end | |
264 end | |
265 local i = 0; | |
266 return function() | |
267 i = i + 1; | |
268 local item = keys[i]; | |
269 if item == nil then | |
270 return nil; | |
271 end | |
272 -- luacheck: ignore 431/err | |
273 local value, err = async.wait_for(new_request("GET", self:_path(username or "@", item.date, nil, item.with, item.key)):next(on_result)); | |
274 if not value then | |
275 module:log("error", "%s", err); | |
276 return nil; | |
277 end | |
278 local delay = value:get_child("delay", "urn:xmpp:delay"); | |
279 | |
280 return item.key, value.tags[2], dt.parse(delay.attr.stamp), item.with; | |
281 end | |
282 end | |
283 | |
284 function archive:users() | |
285 return it.unique(keyval.users(self)); | |
286 end | |
287 | |
288 --[[ TODO | |
289 function archive:delete(username, query) | |
290 return nil, "not-implemented"; | |
291 end | |
292 --]] | |
293 | |
175 module:provides("storage", driver); | 294 module:provides("storage", driver); |