Changeset

5696:b17ba149b7c5

mod_storage_s3: Implement Archive storage
author Kim Alvefur <zash@zash.se>
date Sat, 14 Oct 2023 21:44:14 +0200
parents 5695:c74a96dc5d58
children 5697:51d0311747fa
files mod_storage_s3/mod_storage_s3.lua
diffstat 1 files changed, 119 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/mod_storage_s3/mod_storage_s3.lua	Sat Oct 14 21:41:01 2023 +0200
+++ b/mod_storage_s3/mod_storage_s3.lua	Sat Oct 14 21:44:14 2023 +0200
@@ -1,8 +1,10 @@
 local http = require "prosody.net.http";
 local array = require "prosody.util.array";
 local async = require "prosody.util.async";
+local dt = require "prosody.util.datetime";
 local hashes = require "prosody.util.hashes";
 local httputil = require "prosody.util.http";
+local uuid = require "prosody.util.uuid";
 local it = require "prosody.util.iterators";
 local jid = require "prosody.util.jid";
 local json = require "prosody.util.json";
@@ -10,6 +12,7 @@
 local xml = require "prosody.util.xml";
 local url = require "socket.url";
 
+local new_uuid = uuid.v7 or uuid.generate;
 local hmac_sha256 = hashes.hmac_sha256;
 local sha256 = hashes.sha256;
 
@@ -114,6 +117,9 @@
 
 -- coerce result back into Prosody data type
 local function on_result(response)
+	if response.code >= 400 then
+		error(response.body);
+	end
 	local content_type = response.headers["content-type"];
 	if content_type == "application/json" then
 		return json.decode(response.body);
@@ -172,4 +178,117 @@
 	end
 end
 
+local archive = {};
+driver.archive = { __index = archive };
+
+archive.caps = {
+};
+
+function archive:_path(username, date, when, with, key)
+	return url.build_path({
+		is_absolute = true;
+		bucket;
+		jid.escape(module.host);
+		jid.escape(self.store);
+		jid.escape(username);
+		jid.escape(jid.prep(with));
+		date or dt.date(when);
+		key;
+	})
+end
+
+
+-- PUT .../with/when/id
+function archive:append(username, key, value, when, with)
+	local wrapper = st.stanza("wrapper");
+	-- Minio had trouble with timestamps, probably the ':' characters, in paths.
+	wrapper:tag("delay", { xmlns = "urn:xmpp:delay"; stamp = dt.datetime(when) }):up();
+	wrapper:add_direct_child(value);
+	key = key or new_uuid();
+	return async.wait_for(new_request("PUT", self:_path(username, nil, when, with, key), nil, wrapper):next(function(r)
+		if r.code == 200 then
+			return key;
+		else
+			error(r.body);
+		end
+	end));
+end
+
+function archive:find(username, query)
+	local bucket_path = url.build_path({ is_absolute = true; bucket; is_directory = true });
+	local prefix = { jid.escape(module.host); jid.escape(self.store); is_directory = true };
+	table.insert(prefix, jid.escape(username or "@"));
+	if query["with"] then
+		table.insert(prefix, sha256(jid.prep(query["with"]), true):sub(1,24));
+		if query["start"] and query["end"] and dt.date(query["start"]) == dt.date(query["end"]) then
+			table.insert(prefix, sha256(jid.prep(query["with"]), true):sub(1,24));
+		end
+	end
+
+	prefix = url.build_path(prefix);
+	local list_result, err = async.wait_for(new_request("GET", bucket_path, {
+		prefix = prefix;
+		["max-keys"] = query["max"] and tostring(query["max"]);
+	}));
+	if err or list_result.code ~= 200 then
+		return nil, err;
+	end
+	local list_bucket_result = xml.parse(list_result.body);
+	if list_bucket_result:get_child_text("IsTruncated") == "true" then
+		local max_keys = list_bucket_result:get_child_text("MaxKeys");
+		module:log("warn", "Paging truncated results not implemented, max %s %s returned", max_keys, self.store);
+	end
+	local keys = array();
+	local iterwrap = function(...)
+		return ...;
+	end
+	if query["reverse"] then
+		query["before"], query["after"] = query["after"], query["before"];
+		iterwrap = it.reverse;
+	end
+	local found = not query["after"];
+	for content in iterwrap(list_bucket_result:childtags("Contents")) do
+		local key = url.parse_path(content:get_child_text("Key"));
+		if found and query["before"] == key[6] then
+			break
+		end
+		if (not query["with"] or query["with"] == jid.unescape(key[5]))
+		and (not query["start"] or dt.date(query["start"]) >= key[6])
+		and (not query["end"] or dt.date(query["end"]) <= key[6])
+		and found then
+			keys:push({ key = key[6]; date = key[5]; with = jid.unescape(key[4]) });
+		end
+		if not found and key[6] == query["after"] then
+			found = not found
+		end
+	end
+	local i = 0;
+	return function()
+		i = i + 1;
+		local item = keys[i];
+		if item == nil then
+			return nil;
+		end
+		-- luacheck: ignore 431/err
+		local value, err = async.wait_for(new_request("GET", self:_path(username or "@", item.date, nil, item.with, item.key)):next(on_result));
+		if not value then
+			module:log("error", "%s", err);
+			return nil;
+		end
+		local delay = value:get_child("delay", "urn:xmpp:delay");
+
+		return item.key, value.tags[2], dt.parse(delay.attr.stamp), item.with;
+	end
+end
+
+function archive:users()
+	return it.unique(keyval.users(self));
+end
+
+--[[ TODO
+function archive:delete(username, query)
+	return nil, "not-implemented";
+end
+--]]
+
 module:provides("storage", driver);