Software /
code /
prosody-modules
Changeset
5696:b17ba149b7c5
mod_storage_s3: Implement Archive storage
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Sat, 14 Oct 2023 21:44:14 +0200 |
parents | 5695:c74a96dc5d58 |
children | 5697:51d0311747fa |
files | mod_storage_s3/mod_storage_s3.lua |
diffstat | 1 files changed, 119 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/mod_storage_s3/mod_storage_s3.lua Sat Oct 14 21:41:01 2023 +0200 +++ b/mod_storage_s3/mod_storage_s3.lua Sat Oct 14 21:44:14 2023 +0200 @@ -1,8 +1,10 @@ local http = require "prosody.net.http"; local array = require "prosody.util.array"; local async = require "prosody.util.async"; +local dt = require "prosody.util.datetime"; local hashes = require "prosody.util.hashes"; local httputil = require "prosody.util.http"; +local uuid = require "prosody.util.uuid"; local it = require "prosody.util.iterators"; local jid = require "prosody.util.jid"; local json = require "prosody.util.json"; @@ -10,6 +12,7 @@ local xml = require "prosody.util.xml"; local url = require "socket.url"; +local new_uuid = uuid.v7 or uuid.generate; local hmac_sha256 = hashes.hmac_sha256; local sha256 = hashes.sha256; @@ -114,6 +117,9 @@ -- coerce result back into Prosody data type local function on_result(response) + if response.code >= 400 then + error(response.body); + end local content_type = response.headers["content-type"]; if content_type == "application/json" then return json.decode(response.body); @@ -172,4 +178,117 @@ end end +local archive = {}; +driver.archive = { __index = archive }; + +archive.caps = { +}; + +function archive:_path(username, date, when, with, key) + return url.build_path({ + is_absolute = true; + bucket; + jid.escape(module.host); + jid.escape(self.store); + jid.escape(username); + jid.escape(jid.prep(with)); + date or dt.date(when); + key; + }) +end + + +-- PUT .../with/when/id +function archive:append(username, key, value, when, with) + local wrapper = st.stanza("wrapper"); + -- Minio had trouble with timestamps, probably the ':' characters, in paths. + wrapper:tag("delay", { xmlns = "urn:xmpp:delay"; stamp = dt.datetime(when) }):up(); + wrapper:add_direct_child(value); + key = key or new_uuid(); + return async.wait_for(new_request("PUT", self:_path(username, nil, when, with, key), nil, wrapper):next(function(r) + if r.code == 200 then + return key; + else + error(r.body); + end + end)); +end + +function archive:find(username, query) + local bucket_path = url.build_path({ is_absolute = true; bucket; is_directory = true }); + local prefix = { jid.escape(module.host); jid.escape(self.store); is_directory = true }; + table.insert(prefix, jid.escape(username or "@")); + if query["with"] then + table.insert(prefix, sha256(jid.prep(query["with"]), true):sub(1,24)); + if query["start"] and query["end"] and dt.date(query["start"]) == dt.date(query["end"]) then + table.insert(prefix, sha256(jid.prep(query["with"]), true):sub(1,24)); + end + end + + prefix = url.build_path(prefix); + local list_result, err = async.wait_for(new_request("GET", bucket_path, { + prefix = prefix; + ["max-keys"] = query["max"] and tostring(query["max"]); + })); + if err or list_result.code ~= 200 then + return nil, err; + end + local list_bucket_result = xml.parse(list_result.body); + if list_bucket_result:get_child_text("IsTruncated") == "true" then + local max_keys = list_bucket_result:get_child_text("MaxKeys"); + module:log("warn", "Paging truncated results not implemented, max %s %s returned", max_keys, self.store); + end + local keys = array(); + local iterwrap = function(...) + return ...; + end + if query["reverse"] then + query["before"], query["after"] = query["after"], query["before"]; + iterwrap = it.reverse; + end + local found = not query["after"]; + for content in iterwrap(list_bucket_result:childtags("Contents")) do + local key = url.parse_path(content:get_child_text("Key")); + if found and query["before"] == key[6] then + break + end + if (not query["with"] or query["with"] == jid.unescape(key[5])) + and (not query["start"] or dt.date(query["start"]) >= key[6]) + and (not query["end"] or dt.date(query["end"]) <= key[6]) + and found then + keys:push({ key = key[6]; date = key[5]; with = jid.unescape(key[4]) }); + end + if not found and key[6] == query["after"] then + found = not found + end + end + local i = 0; + return function() + i = i + 1; + local item = keys[i]; + if item == nil then + return nil; + end + -- luacheck: ignore 431/err + local value, err = async.wait_for(new_request("GET", self:_path(username or "@", item.date, nil, item.with, item.key)):next(on_result)); + if not value then + module:log("error", "%s", err); + return nil; + end + local delay = value:get_child("delay", "urn:xmpp:delay"); + + return item.key, value.tags[2], dt.parse(delay.attr.stamp), item.with; + end +end + +function archive:users() + return it.unique(keyval.users(self)); +end + +--[[ TODO +function archive:delete(username, query) + return nil, "not-implemented"; +end +--]] + module:provides("storage", driver);