Comparison

plugins/mod_storage_internal.lua @ 11120:b2331f3dfeea

Merge 0.11->trunk
author Matthew Wild <mwild1@gmail.com>
date Wed, 30 Sep 2020 09:50:33 +0100
parent 10926:c55bd98a54f8
child 11275:b8fada57faf0
comparison
equal deleted inserted replaced
11119:68df52bf08d5 11120:b2331f3dfeea
1 local cache = require "util.cache";
1 local datamanager = require "core.storagemanager".olddm; 2 local datamanager = require "core.storagemanager".olddm;
2 local array = require "util.array"; 3 local array = require "util.array";
3 local datetime = require "util.datetime"; 4 local datetime = require "util.datetime";
4 local st = require "util.stanza"; 5 local st = require "util.stanza";
5 local now = require "util.time".now; 6 local now = require "util.time".now;
6 local id = require "util.id".medium; 7 local id = require "util.id".medium;
8 local jid_join = require "util.jid".join;
7 9
8 local host = module.host; 10 local host = module.host;
11
12 local archive_item_limit = module:get_option_number("storage_archive_item_limit", 10000);
13 local archive_item_count_cache = cache.new(module:get_option("storage_archive_item_limit_cache_size", 1000));
9 14
10 local driver = {}; 15 local driver = {};
11 16
12 function driver:open(store, typ) 17 function driver:open(store, typ)
13 local mt = self[typ or "keyval"] 18 local mt = self[typ or "keyval"]
40 return datamanager.users(host, self.store, self.type); 45 return datamanager.users(host, self.store, self.type);
41 end 46 end
42 47
43 local archive = {}; 48 local archive = {};
44 driver.archive = { __index = archive }; 49 driver.archive = { __index = archive };
50
51 archive.caps = {
52 total = true;
53 quota = archive_item_limit;
54 truncate = true;
55 };
45 56
46 function archive:append(username, key, value, when, with) 57 function archive:append(username, key, value, when, with)
47 when = when or now(); 58 when = when or now();
48 if not st.is_stanza(value) then 59 if not st.is_stanza(value) then
49 return nil, "unsupported-datatype"; 60 return nil, "unsupported-datatype";
52 value.when = when; 63 value.when = when;
53 value.with = with; 64 value.with = with;
54 value.attr.stamp = datetime.datetime(when); 65 value.attr.stamp = datetime.datetime(when);
55 value.attr.stamp_legacy = datetime.legacy(when); 66 value.attr.stamp_legacy = datetime.legacy(when);
56 67
68 local cache_key = jid_join(username, host, self.store);
69 local item_count = archive_item_count_cache:get(cache_key);
70
57 if key then 71 if key then
58 local items, err = datamanager.list_load(username, host, self.store); 72 local items, err = datamanager.list_load(username, host, self.store);
59 if not items and err then return items, err; end 73 if not items and err then return items, err; end
74
75 -- Check the quota
76 item_count = items and #items or 0;
77 archive_item_count_cache:set(cache_key, item_count);
78 if item_count >= archive_item_limit then
79 module:log("debug", "%s reached or over quota, not adding to store", username);
80 return nil, "quota-limit";
81 end
82
60 if items then 83 if items then
84 -- Filter out any item with the same key as the one being added
61 items = array(items); 85 items = array(items);
62 items:filter(function (item) 86 items:filter(function (item)
63 return item.key ~= key; 87 return item.key ~= key;
64 end); 88 end);
89
65 value.key = key; 90 value.key = key;
66 items:push(value); 91 items:push(value);
67 local ok, err = datamanager.list_store(username, host, self.store, items); 92 local ok, err = datamanager.list_store(username, host, self.store, items);
68 if not ok then return ok, err; end 93 if not ok then return ok, err; end
94 archive_item_count_cache:set(cache_key, #items);
69 return key; 95 return key;
70 end 96 end
71 else 97 else
98 if not item_count then -- Item count not cached?
99 -- We need to load the list to get the number of items currently stored
100 local items, err = datamanager.list_load(username, host, self.store);
101 if not items and err then return items, err; end
102 item_count = items and #items or 0;
103 archive_item_count_cache:set(cache_key, item_count);
104 end
105 if item_count >= archive_item_limit then
106 module:log("debug", "%s reached or over quota, not adding to store", username);
107 return nil, "quota-limit";
108 end
72 key = id(); 109 key = id();
73 end 110 end
111
112 module:log("debug", "%s has %d items out of %d limit in store %s", username, item_count, archive_item_limit, self.store);
74 113
75 value.key = key; 114 value.key = key;
76 115
77 local ok, err = datamanager.list_append(username, host, self.store, value); 116 local ok, err = datamanager.list_append(username, host, self.store, value);
78 if not ok then return ok, err; end 117 if not ok then return ok, err; end
118 archive_item_count_cache:set(cache_key, item_count+1);
79 return key; 119 return key;
80 end 120 end
81 121
82 function archive:find(username, query) 122 function archive:find(username, query)
83 local items, err = datamanager.list_load(username, host, self.store); 123 local items, err = datamanager.list_load(username, host, self.store);
84 if not items then 124 if not items then
85 if err then 125 if err then
86 return items, err; 126 return items, err;
87 else 127 elseif query then
88 return function () end, 0; 128 if query.before or query.after then
89 end 129 return nil, "item-not-found";
90 end 130 end
91 local count = #items; 131 if query.total then
92 local i = 0; 132 return function () end, 0;
133 end
134 end
135 return function () end;
136 end
137 local count = nil;
138 local i, last_key = 0;
93 if query then 139 if query then
94 items = array(items); 140 items = array(items);
95 if query.key then 141 if query.key then
96 items:filter(function (item) 142 items:filter(function (item)
97 return item.key == query.key; 143 return item.key == query.key;
112 items:filter(function (item) 158 items:filter(function (item)
113 local when = item.when or datetime.parse(item.attr.stamp); 159 local when = item.when or datetime.parse(item.attr.stamp);
114 return when <= query["end"]; 160 return when <= query["end"];
115 end); 161 end);
116 end 162 end
117 count = #items; 163 if query.total then
164 count = #items;
165 end
118 if query.reverse then 166 if query.reverse then
119 items:reverse(); 167 items:reverse();
120 if query.before then 168 if query.before then
121 for j = 1, count do 169 local found = false;
170 for j = 1, #items do
122 if (items[j].key or tostring(j)) == query.before then 171 if (items[j].key or tostring(j)) == query.before then
172 found = true;
123 i = j; 173 i = j;
124 break; 174 break;
125 end 175 end
126 end 176 end
127 end 177 if not found then
178 return nil, "item-not-found";
179 end
180 end
181 elseif query.before then
182 last_key = query.before;
128 elseif query.after then 183 elseif query.after then
129 for j = 1, count do 184 local found = false;
185 for j = 1, #items do
130 if (items[j].key or tostring(j)) == query.after then 186 if (items[j].key or tostring(j)) == query.after then
187 found = true;
131 i = j; 188 i = j;
132 break; 189 break;
133 end 190 end
191 end
192 if not found then
193 return nil, "item-not-found";
134 end 194 end
135 end 195 end
136 if query.limit and #items - i > query.limit then 196 if query.limit and #items - i > query.limit then
137 items[i+query.limit+1] = nil; 197 items[i+query.limit+1] = nil;
138 end 198 end
139 end 199 end
140 return function () 200 return function ()
141 i = i + 1; 201 i = i + 1;
142 local item = items[i]; 202 local item = items[i];
143 if not item then return; end 203 if not item or (last_key and item.key == last_key) then
204 return;
205 end
144 local key = item.key or tostring(i); 206 local key = item.key or tostring(i);
145 local when = item.when or datetime.parse(item.attr.stamp); 207 local when = item.when or datetime.parse(item.attr.stamp);
146 local with = item.with; 208 local with = item.with;
147 item.key, item.when, item.with = nil, nil, nil; 209 item.key, item.when, item.with = nil, nil, nil;
148 item.attr.stamp = nil; 210 item.attr.stamp = nil;
150 item = st.deserialize(item); 212 item = st.deserialize(item);
151 return key, item, when, with; 213 return key, item, when, with;
152 end, count; 214 end, count;
153 end 215 end
154 216
217 function archive:get(username, wanted_key)
218 local iter, err = self:find(username, { key = wanted_key })
219 if not iter then return iter, err; end
220 for key, stanza, when, with in iter do
221 if key == wanted_key then
222 return stanza, when, with;
223 end
224 end
225 return nil, "item-not-found";
226 end
227
228 function archive:set(username, key, new_value, new_when, new_with)
229 local items, err = datamanager.list_load(username, host, self.store);
230 if not items then
231 if err then
232 return items, err;
233 else
234 return nil, "item-not-found";
235 end
236 end
237
238 for i = 1, #items do
239 local old_item = items[i];
240 if old_item.key == key then
241 local item = st.preserialize(st.clone(new_value));
242
243 local when = new_when or old_item.when or datetime.parse(old_item.attr.stamp);
244 item.key = key;
245 item.when = when;
246 item.with = new_with or old_item.with;
247 item.attr.stamp = datetime.datetime(when);
248 item.attr.stamp_legacy = datetime.legacy(when);
249 items[i] = item;
250 return datamanager.list_store(username, host, self.store, items);
251 end
252 end
253
254 return nil, "item-not-found";
255 end
256
155 function archive:dates(username) 257 function archive:dates(username)
156 local items, err = datamanager.list_load(username, host, self.store); 258 local items, err = datamanager.list_load(username, host, self.store);
157 if not items then return items, err; end 259 if not items then return items, err; end
158 return array(items):pluck("when"):map(datetime.date):unique(); 260 return array(items):pluck("when"):map(datetime.date):unique();
159 end 261 end
160 262
263 function archive:summary(username, query)
264 local iter, err = self:find(username, query)
265 if not iter then return iter, err; end
266 local counts = {};
267 local earliest = {};
268 local latest = {};
269 local body = {};
270 for _, stanza, when, with in iter do
271 counts[with] = (counts[with] or 0) + 1;
272 if earliest[with] == nil then
273 earliest[with] = when;
274 end
275 latest[with] = when;
276 body[with] = stanza:get_child_text("body") or body[with];
277 end
278 return {
279 counts = counts;
280 earliest = earliest;
281 latest = latest;
282 body = body;
283 };
284 end
285
286 function archive:users()
287 return datamanager.users(host, self.store, "list");
288 end
289
161 function archive:delete(username, query) 290 function archive:delete(username, query)
291 local cache_key = jid_join(username, host, self.store);
162 if not query or next(query) == nil then 292 if not query or next(query) == nil then
293 archive_item_count_cache:set(cache_key, nil);
163 return datamanager.list_store(username, host, self.store, nil); 294 return datamanager.list_store(username, host, self.store, nil);
164 end 295 end
165 local items, err = datamanager.list_load(username, host, self.store); 296 local items, err = datamanager.list_load(username, host, self.store);
166 if not items then 297 if not items then
167 if err then 298 if err then
168 return items, err; 299 return items, err;
169 end 300 end
301 archive_item_count_cache:set(cache_key, 0);
170 -- Store is empty 302 -- Store is empty
171 return 0; 303 return 0;
172 end 304 end
173 items = array(items); 305 items = array(items);
174 local count_before = #items; 306 local count_before = #items;
214 if count == 0 then 346 if count == 0 then
215 return 0; -- No changes, skip write 347 return 0; -- No changes, skip write
216 end 348 end
217 local ok, err = datamanager.list_store(username, host, self.store, items); 349 local ok, err = datamanager.list_store(username, host, self.store, items);
218 if not ok then return ok, err; end 350 if not ok then return ok, err; end
351 archive_item_count_cache:set(cache_key, #items);
219 return count; 352 return count;
220 end 353 end
221 354
222 module:provides("storage", driver); 355 module:provides("storage", driver);