Comparison

mod_storage_s3/mod_storage_s3.lua @ 5696:b17ba149b7c5

mod_storage_s3: Implement Archive storage
author Kim Alvefur <zash@zash.se>
date Sat, 14 Oct 2023 21:44:14 +0200
parent 5695:c74a96dc5d58
child 5697:51d0311747fa
comparison
equal deleted inserted replaced
5695:c74a96dc5d58 5696:b17ba149b7c5
1 local http = require "prosody.net.http"; 1 local http = require "prosody.net.http";
2 local array = require "prosody.util.array"; 2 local array = require "prosody.util.array";
3 local async = require "prosody.util.async"; 3 local async = require "prosody.util.async";
4 local dt = require "prosody.util.datetime";
4 local hashes = require "prosody.util.hashes"; 5 local hashes = require "prosody.util.hashes";
5 local httputil = require "prosody.util.http"; 6 local httputil = require "prosody.util.http";
7 local uuid = require "prosody.util.uuid";
6 local it = require "prosody.util.iterators"; 8 local it = require "prosody.util.iterators";
7 local jid = require "prosody.util.jid"; 9 local jid = require "prosody.util.jid";
8 local json = require "prosody.util.json"; 10 local json = require "prosody.util.json";
9 local st = require "prosody.util.stanza"; 11 local st = require "prosody.util.stanza";
10 local xml = require "prosody.util.xml"; 12 local xml = require "prosody.util.xml";
11 local url = require "socket.url"; 13 local url = require "socket.url";
12 14
15 local new_uuid = uuid.v7 or uuid.generate;
13 local hmac_sha256 = hashes.hmac_sha256; 16 local hmac_sha256 = hashes.hmac_sha256;
14 local sha256 = hashes.sha256; 17 local sha256 = hashes.sha256;
15 18
16 local driver = {}; 19 local driver = {};
17 20
112 return http.request(url.build(request), { method = method; headers = headers; body = payload }); 115 return http.request(url.build(request), { method = method; headers = headers; body = payload });
113 end 116 end
114 117
115 -- coerce result back into Prosody data type 118 -- coerce result back into Prosody data type
116 local function on_result(response) 119 local function on_result(response)
120 if response.code >= 400 then
121 error(response.body);
122 end
117 local content_type = response.headers["content-type"]; 123 local content_type = response.headers["content-type"];
118 if content_type == "application/json" then 124 if content_type == "application/json" then
119 return json.decode(response.body); 125 return json.decode(response.body);
120 elseif content_type == "application/xml" then 126 elseif content_type == "application/xml" then
121 return xml.parse(response.body); 127 return xml.parse(response.body);
170 return function() 176 return function()
171 return keys:pop(); 177 return keys:pop();
172 end 178 end
173 end 179 end
174 180
181 local archive = {};
182 driver.archive = { __index = archive };
183
184 archive.caps = {
185 };
186
187 function archive:_path(username, date, when, with, key)
188 return url.build_path({
189 is_absolute = true;
190 bucket;
191 jid.escape(module.host);
192 jid.escape(self.store);
193 jid.escape(username);
194 jid.escape(jid.prep(with));
195 date or dt.date(when);
196 key;
197 })
198 end
199
200
201 -- PUT .../with/when/id
202 function archive:append(username, key, value, when, with)
203 local wrapper = st.stanza("wrapper");
204 -- Minio had trouble with timestamps, probably the ':' characters, in paths.
205 wrapper:tag("delay", { xmlns = "urn:xmpp:delay"; stamp = dt.datetime(when) }):up();
206 wrapper:add_direct_child(value);
207 key = key or new_uuid();
208 return async.wait_for(new_request("PUT", self:_path(username, nil, when, with, key), nil, wrapper):next(function(r)
209 if r.code == 200 then
210 return key;
211 else
212 error(r.body);
213 end
214 end));
215 end
216
217 function archive:find(username, query)
218 local bucket_path = url.build_path({ is_absolute = true; bucket; is_directory = true });
219 local prefix = { jid.escape(module.host); jid.escape(self.store); is_directory = true };
220 table.insert(prefix, jid.escape(username or "@"));
221 if query["with"] then
222 table.insert(prefix, sha256(jid.prep(query["with"]), true):sub(1,24));
223 if query["start"] and query["end"] and dt.date(query["start"]) == dt.date(query["end"]) then
224 table.insert(prefix, sha256(jid.prep(query["with"]), true):sub(1,24));
225 end
226 end
227
228 prefix = url.build_path(prefix);
229 local list_result, err = async.wait_for(new_request("GET", bucket_path, {
230 prefix = prefix;
231 ["max-keys"] = query["max"] and tostring(query["max"]);
232 }));
233 if err or list_result.code ~= 200 then
234 return nil, err;
235 end
236 local list_bucket_result = xml.parse(list_result.body);
237 if list_bucket_result:get_child_text("IsTruncated") == "true" then
238 local max_keys = list_bucket_result:get_child_text("MaxKeys");
239 module:log("warn", "Paging truncated results not implemented, max %s %s returned", max_keys, self.store);
240 end
241 local keys = array();
242 local iterwrap = function(...)
243 return ...;
244 end
245 if query["reverse"] then
246 query["before"], query["after"] = query["after"], query["before"];
247 iterwrap = it.reverse;
248 end
249 local found = not query["after"];
250 for content in iterwrap(list_bucket_result:childtags("Contents")) do
251 local key = url.parse_path(content:get_child_text("Key"));
252 if found and query["before"] == key[6] then
253 break
254 end
255 if (not query["with"] or query["with"] == jid.unescape(key[5]))
256 and (not query["start"] or dt.date(query["start"]) >= key[6])
257 and (not query["end"] or dt.date(query["end"]) <= key[6])
258 and found then
259 keys:push({ key = key[6]; date = key[5]; with = jid.unescape(key[4]) });
260 end
261 if not found and key[6] == query["after"] then
262 found = not found
263 end
264 end
265 local i = 0;
266 return function()
267 i = i + 1;
268 local item = keys[i];
269 if item == nil then
270 return nil;
271 end
272 -- luacheck: ignore 431/err
273 local value, err = async.wait_for(new_request("GET", self:_path(username or "@", item.date, nil, item.with, item.key)):next(on_result));
274 if not value then
275 module:log("error", "%s", err);
276 return nil;
277 end
278 local delay = value:get_child("delay", "urn:xmpp:delay");
279
280 return item.key, value.tags[2], dt.parse(delay.attr.stamp), item.with;
281 end
282 end
283
284 function archive:users()
285 return it.unique(keyval.users(self));
286 end
287
288 --[[ TODO
289 function archive:delete(username, query)
290 return nil, "not-implemented";
291 end
292 --]]
293
175 module:provides("storage", driver); 294 module:provides("storage", driver);