Comparison

plugins/mod_storage_internal.lua @ 13135:3fd24e1945b0

mod_storage_internal: Lazy-load archive items while iterating Very large list files previously ran into limits of the Lua parser, or just caused Prosody to freeze while parsing. Using the new index we can parse individual items one at a time. This probably won't reduce overall CPU usage, probably the opposite, but it will reduce the number of items in memory at once and allow collection of items after we iterated past them.
author Kim Alvefur <zash@zash.se>
date Wed, 12 May 2021 01:25:44 +0200
parent 12977:74b9e05af71e
child 13136:396db0e7084f
comparison
equal deleted inserted replaced
13134:638f627e707f 13135:3fd24e1945b0
5 local st = require "prosody.util.stanza"; 5 local st = require "prosody.util.stanza";
6 local now = require "prosody.util.time".now; 6 local now = require "prosody.util.time".now;
7 local id = require "prosody.util.id".medium; 7 local id = require "prosody.util.id".medium;
8 local jid_join = require "prosody.util.jid".join; 8 local jid_join = require "prosody.util.jid".join;
9 local set = require "prosody.util.set"; 9 local set = require "prosody.util.set";
10 local it = require "prosody.util.iterators";
10 11
11 local host = module.host; 12 local host = module.host;
12 13
13 local archive_item_limit = module:get_option_number("storage_archive_item_limit", 10000); 14 local archive_item_limit = module:get_option_number("storage_archive_item_limit", 10000);
14 local archive_item_count_cache = cache.new(module:get_option("storage_archive_item_limit_cache_size", 1000)); 15 local archive_item_count_cache = cache.new(module:get_option("storage_archive_item_limit_cache_size", 1000));
120 archive_item_count_cache:set(cache_key, item_count+1); 121 archive_item_count_cache:set(cache_key, item_count+1);
121 return key; 122 return key;
122 end 123 end
123 124
124 function archive:find(username, query) 125 function archive:find(username, query)
125 local items, err = datamanager.list_load(username, host, self.store); 126 local list, err = datamanager.list_open(username, host, self.store);
126 if not items then 127 if not list then
127 if err then 128 if err then
128 return items, err; 129 return list, err;
129 elseif query then 130 elseif query then
130 if query.before or query.after then 131 if query.before or query.after then
131 return nil, "item-not-found"; 132 return nil, "item-not-found";
132 end 133 end
133 if query.total then 134 if query.total then
134 return function () end, 0; 135 return function()
136 end, 0;
135 end 137 end
136 end 138 end
137 return function () end; 139 return function()
138 end 140 end;
139 local count = nil; 141 end
140 local i, last_key = 0; 142
143 local i = 0;
144 local iter = function()
145 i = i + 1;
146 return list[i]
147 end
148
141 if query then 149 if query then
142 items = array(items); 150 if query.reverse then
151 i = #list + 1
152 iter = function()
153 i = i - 1
154 return list[i]
155 end
156 end
143 if query.key then 157 if query.key then
144 items:filter(function (item) 158 iter = it.filter(function(item)
145 return item.key == query.key; 159 return item.key == query.key;
146 end); 160 end, iter);
147 end 161 end
148 if query.ids then 162 if query.ids then
149 local ids = set.new(query.ids); 163 local ids = set.new(query.ids);
150 items:filter(function (item) 164 iter = it.filter(function(item)
151 return ids:contains(item.key); 165 return ids:contains(item.key);
152 end); 166 end, iter);
153 end 167 end
154 if query.with then 168 if query.with then
155 items:filter(function (item) 169 iter = it.filter(function(item)
156 return item.with == query.with; 170 return item.with == query.with;
157 end); 171 end, iter);
158 end 172 end
159 if query.start then 173 if query.start then
160 items:filter(function (item) 174 iter = it.filter(function(item)
161 local when = item.when or datetime.parse(item.attr.stamp); 175 local when = item.when or datetime.parse(item.attr.stamp);
162 return when >= query.start; 176 return when >= query.start;
163 end); 177 end, iter);
164 end 178 end
165 if query["end"] then 179 if query["end"] then
166 items:filter(function (item) 180 iter = it.filter(function(item)
167 local when = item.when or datetime.parse(item.attr.stamp); 181 local when = item.when or datetime.parse(item.attr.stamp);
168 return when <= query["end"]; 182 return when <= query["end"];
169 end); 183 end, iter);
170 end 184 end
171 if query.total then 185 if query.after then
172 count = #items; 186 local found = false;
173 end 187 iter = it.filter(function(item)
174 if query.reverse then 188 local found_after = found;
175 items:reverse(); 189 if item.key == query.after then
176 if query.before then 190 found = true
177 local found = false;
178 for j = 1, #items do
179 if (items[j].key or tostring(j)) == query.before then
180 found = true;
181 i = j;
182 break;
183 end
184 end 191 end
185 if not found then 192 return found_after;
186 return nil, "item-not-found"; 193 end, iter);
194 end
195 if query.before then
196 local found = false;
197 iter = it.filter(function(item)
198 if item.key == query.before then
199 found = true
187 end 200 end
188 end 201 return not found;
189 last_key = query.after; 202 end, iter);
190 elseif query.after then 203 end
191 local found = false; 204 if query.limit then
192 for j = 1, #items do 205 iter = it.head(query.limit, iter);
193 if (items[j].key or tostring(j)) == query.after then 206 end
194 found = true; 207 end
195 i = j; 208
196 break; 209 return function()
197 end 210 local item = iter();
198 end 211 if item == nil then
199 if not found then 212 return
200 return nil, "item-not-found"; 213 end
201 end 214 local key = item.key;
202 last_key = query.before; 215 local when = item.when or item.attr and datetime.parse(item.attr.stamp);
203 elseif query.before then
204 last_key = query.before;
205 end
206 if query.limit and #items - i > query.limit then
207 items[i+query.limit+1] = nil;
208 end
209 end
210 return function ()
211 i = i + 1;
212 local item = items[i];
213 if not item or (last_key and item.key == last_key) then
214 return;
215 end
216 local key = item.key or tostring(i);
217 local when = item.when or datetime.parse(item.attr.stamp);
218 local with = item.with; 216 local with = item.with;
219 item.key, item.when, item.with = nil, nil, nil; 217 item.key, item.when, item.with = nil, nil, nil;
220 item.attr.stamp = nil; 218 item.attr.stamp = nil;
221 -- COMPAT Stored data may still contain legacy XEP-0091 timestamp 219 -- COMPAT Stored data may still contain legacy XEP-0091 timestamp
222 item.attr.stamp_legacy = nil; 220 item.attr.stamp_legacy = nil;
223 item = st.deserialize(item); 221 item = st.deserialize(item);
224 return key, item, when, with; 222 return key, item, when, with;
225 end, count; 223 end
226 end 224 end
227 225
228 function archive:get(username, wanted_key) 226 function archive:get(username, wanted_key)
229 local iter, err = self:find(username, { key = wanted_key }) 227 local iter, err = self:find(username, { key = wanted_key })
230 if not iter then return iter, err; end 228 if not iter then return iter, err; end