Software /
code /
prosody-modules
Comparison
mod_storage_xmlarchive/mod_storage_xmlarchive.lua @ 1690:8c0fbc685364
mod_storage_xmlarchive: New stanza archive-only storage module backed by plain text files
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Sun, 03 May 2015 13:45:42 +0200 |
child | 1726:160c35d2a5a2 |
comparison
equal
deleted
inserted
replaced
1689:06f9ab0c078c | 1690:8c0fbc685364 |
---|---|
1 local dm = require "core.storagemanager".olddm; | |
2 local hmac_sha256 = require"util.hashes".hmac_sha256; | |
3 local st = require"util.stanza"; | |
4 local dt = require"util.datetime"; | |
5 local new_stream = require "util.xmppstream".new; | |
6 local empty = {}; | |
7 | |
8 local function fallocate(f, offset, len) | |
9 -- This assumes that current position == offset | |
10 local fake_data = (" "):rep(len); | |
11 local ok, msg = f:write(fake_data); | |
12 if not ok then | |
13 return ok, msg; | |
14 end | |
15 f:seek("set", offset); | |
16 return true; | |
17 end; | |
18 pcall(function() | |
19 local pposix = require "util.pposix"; | |
20 fallocate = pposix.fallocate or fallocate; | |
21 end); | |
22 | |
23 local archive = {}; | |
24 local archive_mt = { __index = archive }; | |
25 | |
26 function archive:append(username, _, when, with, data) | |
27 if getmetatable(data) ~= st.stanza_mt then | |
28 return nil, "unsupported-datatype"; | |
29 end | |
30 username = username or "@"; | |
31 data = tostring(data) .. "\n"; | |
32 local day = dt.date(when); | |
33 local filename = dm.getpath(username.."@"..day, module.host, self.store, "xml", true); | |
34 local ok, err; | |
35 local f = io.open(filename, "r+"); | |
36 if not f then | |
37 f, err = io.open(filename, "w"); | |
38 if not f then return nil, err; end | |
39 ok, err = dm.list_append(username, module.host, self.store, day); | |
40 if not ok then return nil, err; end | |
41 end | |
42 local offset = f and f:seek("end"); | |
43 ok, err = fallocate(f, offset, #data); | |
44 if not ok then return nil, err; end | |
45 f:seek("set", offset); | |
46 ok, err = f:write(data); | |
47 if not ok then return nil, err; end | |
48 ok, err = f:close(); | |
49 if not ok then return nil, err; end | |
50 local id = day .. "-" .. hmac_sha256(username.."@"..day.."+"..offset, data, true):sub(-16); | |
51 ok, err = dm.list_append(username.."@"..day, module.host, self.store, { id = id, when = when, with = with, offset = offset, length = #data }); | |
52 if not ok then return nil, err; end | |
53 return id; | |
54 end | |
55 | |
56 function archive:find(username, query) | |
57 username = username or "@"; | |
58 query = query or empty; | |
59 | |
60 local result; | |
61 local function cb(_, stanza) | |
62 if result then | |
63 module:log("warn", "Multiple items in chunk"); | |
64 end | |
65 result = stanza; | |
66 end | |
67 | |
68 local stream_sess = { notopen = true }; | |
69 local stream = new_stream(stream_sess, { handlestanza = cb, stream_ns = "jabber:client"}); | |
70 local dates = dm.list_load(username, module.host, self.store) or empty; | |
71 stream:feed(st.stanza("stream", { xmlns = "jabber:client" }):top_tag()); | |
72 stream_sess.notopen = nil; | |
73 | |
74 local limit = query.limit; | |
75 local start_day, step, last_day = 1, 1, #dates; | |
76 local count = 0; | |
77 local rev = query.reverse; | |
78 local in_range = not (query.after or query.before); | |
79 if query.after or query.start then | |
80 local d = query.after and query.after:sub(1, 10) or dt.date(query.start); | |
81 for i = 1, #dates do | |
82 if dates[i] == d then | |
83 start_day = i; break; | |
84 end | |
85 end | |
86 end | |
87 if query.before or query["end"] then | |
88 local d = query.before and query.before:sub(1, 10) or dt.date(query["end"]); | |
89 for i = #dates, 1, -1 do | |
90 if dates[i] == d then | |
91 last_day = i; break; | |
92 end | |
93 end | |
94 end | |
95 if rev then | |
96 start_day, step, last_day = last_day, -step, start_day; | |
97 end | |
98 local items, xmlfile; | |
99 local first_item, last_item; | |
100 | |
101 return function () | |
102 if limit and count >= limit then xmlfile:close() return; end | |
103 | |
104 for d = start_day, last_day, step do | |
105 if d ~= start_day or not items then | |
106 module:log("debug", "Load items for %s", dates[d]); | |
107 start_day = d; | |
108 items = dm.list_load(username .. "@" .. dates[d], module.host, self.store) or empty; | |
109 if not rev then | |
110 first_item, last_item = 1, #items; | |
111 else | |
112 first_item, last_item = #items, 1; | |
113 end | |
114 local ferr; | |
115 xmlfile, ferr = io.open(dm.getpath(username .. "@" .. dates[d], module.host, self.store, "xml")); | |
116 if not xmlfile then | |
117 module:log("error", "Error: %s", ferr); | |
118 return; | |
119 end | |
120 end | |
121 | |
122 for i = first_item, last_item, step do | |
123 module:log("debug", "data[%q][%d]", dates[d], i); | |
124 local item = items[i]; | |
125 if not item then | |
126 module:log("debug", "data[%q][%d] is nil", dates[d], i); | |
127 break; | |
128 end | |
129 if xmlfile and in_range | |
130 and (not query.with or item.with == query.with) | |
131 and (not query.start or item.when >= query.start) | |
132 and (not query["end"] or item.when <= query["end"]) then | |
133 count = count + 1; | |
134 first_item = i + step; | |
135 | |
136 xmlfile:seek("set", item.offset); | |
137 local data = xmlfile:read(item.length); | |
138 local ok, err = stream:feed(data); | |
139 if not ok then | |
140 module:log("warn", "Parse error: %s", err); | |
141 end | |
142 if result then | |
143 local stanza = result; | |
144 result = nil; | |
145 return item.id, stanza, item.when, item.with; | |
146 end | |
147 end | |
148 if (rev and item.id == query.after) or | |
149 (not rev and item.id == query.before) then | |
150 in_range = false; | |
151 limit = count; | |
152 end | |
153 if (rev and item.id == query.before) or | |
154 (not rev and item.id == query.after) then | |
155 in_range = true; | |
156 end | |
157 end | |
158 end | |
159 if xmlfile then | |
160 xmlfile:close(); | |
161 xmlfile = nil; | |
162 end | |
163 end | |
164 end | |
165 | |
166 function archive:delete(username, query) | |
167 username = username or "@"; | |
168 query = query or empty; | |
169 if query.with or query.start or query.after then | |
170 return nil, "not-implemented"; -- Only trimming the oldest messages | |
171 end | |
172 local before = query.before or query["end"] or "9999-12-31"; | |
173 if type(before) == "number" then before = dt.date(before); else before = before:sub(1, 10); end | |
174 local dates = dm.list_load(username, module.host, self.store) or empty; | |
175 local remaining_dates = {}; | |
176 for d = 1, #dates do | |
177 if dates[d] >= before then | |
178 table.insert(remaining_dates, dates[d]); | |
179 end | |
180 end | |
181 table.sort(remaining_dates); | |
182 local ok, err = dm.list_store(username, module.host, self.store, remaining_dates); | |
183 if not ok then return ok, err; end | |
184 for d = 1, #dates do | |
185 if dates[d] < before then | |
186 os.remove(dm.getpath(username .. "@" .. dates[d], module.host, self.store, "list")); | |
187 os.remove(dm.getpath(username .. "@" .. dates[d], module.host, self.store, "xml")); | |
188 end | |
189 end | |
190 return true; | |
191 end | |
192 | |
193 local provider = {}; | |
194 function provider:open(store, typ) | |
195 if typ ~= "archive" then return nil, "unsupported-store"; end | |
196 return setmetatable({ store = store }, archive_mt); | |
197 end | |
198 | |
199 module:provides("storage", provider); |