Software /
code /
prosody
Comparison
plugins/mod_storage_internal.lua @ 13135:3fd24e1945b0
mod_storage_internal: Lazy-load archive items while iterating
Very large list files previously ran into limits of the Lua parser, or
just caused Prosody to freeze while parsing.
Using the new index we can parse individual items one at a time. This
probably won't reduce overall CPU usage, probably the opposite, but it
will reduce the number of items in memory at once and allow collection
of items after we iterated past them.
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Wed, 12 May 2021 01:25:44 +0200 |
parent | 12977:74b9e05af71e |
child | 13136:396db0e7084f |
comparison
equal
deleted
inserted
replaced
13134:638f627e707f | 13135:3fd24e1945b0 |
---|---|
5 local st = require "prosody.util.stanza"; | 5 local st = require "prosody.util.stanza"; |
6 local now = require "prosody.util.time".now; | 6 local now = require "prosody.util.time".now; |
7 local id = require "prosody.util.id".medium; | 7 local id = require "prosody.util.id".medium; |
8 local jid_join = require "prosody.util.jid".join; | 8 local jid_join = require "prosody.util.jid".join; |
9 local set = require "prosody.util.set"; | 9 local set = require "prosody.util.set"; |
10 local it = require "prosody.util.iterators"; | |
10 | 11 |
11 local host = module.host; | 12 local host = module.host; |
12 | 13 |
13 local archive_item_limit = module:get_option_number("storage_archive_item_limit", 10000); | 14 local archive_item_limit = module:get_option_number("storage_archive_item_limit", 10000); |
14 local archive_item_count_cache = cache.new(module:get_option("storage_archive_item_limit_cache_size", 1000)); | 15 local archive_item_count_cache = cache.new(module:get_option("storage_archive_item_limit_cache_size", 1000)); |
120 archive_item_count_cache:set(cache_key, item_count+1); | 121 archive_item_count_cache:set(cache_key, item_count+1); |
121 return key; | 122 return key; |
122 end | 123 end |
123 | 124 |
124 function archive:find(username, query) | 125 function archive:find(username, query) |
125 local items, err = datamanager.list_load(username, host, self.store); | 126 local list, err = datamanager.list_open(username, host, self.store); |
126 if not items then | 127 if not list then |
127 if err then | 128 if err then |
128 return items, err; | 129 return list, err; |
129 elseif query then | 130 elseif query then |
130 if query.before or query.after then | 131 if query.before or query.after then |
131 return nil, "item-not-found"; | 132 return nil, "item-not-found"; |
132 end | 133 end |
133 if query.total then | 134 if query.total then |
134 return function () end, 0; | 135 return function() |
136 end, 0; | |
135 end | 137 end |
136 end | 138 end |
137 return function () end; | 139 return function() |
138 end | 140 end; |
139 local count = nil; | 141 end |
140 local i, last_key = 0; | 142 |
143 local i = 0; | |
144 local iter = function() | |
145 i = i + 1; | |
146 return list[i] | |
147 end | |
148 | |
141 if query then | 149 if query then |
142 items = array(items); | 150 if query.reverse then |
151 i = #list + 1 | |
152 iter = function() | |
153 i = i - 1 | |
154 return list[i] | |
155 end | |
156 end | |
143 if query.key then | 157 if query.key then |
144 items:filter(function (item) | 158 iter = it.filter(function(item) |
145 return item.key == query.key; | 159 return item.key == query.key; |
146 end); | 160 end, iter); |
147 end | 161 end |
148 if query.ids then | 162 if query.ids then |
149 local ids = set.new(query.ids); | 163 local ids = set.new(query.ids); |
150 items:filter(function (item) | 164 iter = it.filter(function(item) |
151 return ids:contains(item.key); | 165 return ids:contains(item.key); |
152 end); | 166 end, iter); |
153 end | 167 end |
154 if query.with then | 168 if query.with then |
155 items:filter(function (item) | 169 iter = it.filter(function(item) |
156 return item.with == query.with; | 170 return item.with == query.with; |
157 end); | 171 end, iter); |
158 end | 172 end |
159 if query.start then | 173 if query.start then |
160 items:filter(function (item) | 174 iter = it.filter(function(item) |
161 local when = item.when or datetime.parse(item.attr.stamp); | 175 local when = item.when or datetime.parse(item.attr.stamp); |
162 return when >= query.start; | 176 return when >= query.start; |
163 end); | 177 end, iter); |
164 end | 178 end |
165 if query["end"] then | 179 if query["end"] then |
166 items:filter(function (item) | 180 iter = it.filter(function(item) |
167 local when = item.when or datetime.parse(item.attr.stamp); | 181 local when = item.when or datetime.parse(item.attr.stamp); |
168 return when <= query["end"]; | 182 return when <= query["end"]; |
169 end); | 183 end, iter); |
170 end | 184 end |
171 if query.total then | 185 if query.after then |
172 count = #items; | 186 local found = false; |
173 end | 187 iter = it.filter(function(item) |
174 if query.reverse then | 188 local found_after = found; |
175 items:reverse(); | 189 if item.key == query.after then |
176 if query.before then | 190 found = true |
177 local found = false; | |
178 for j = 1, #items do | |
179 if (items[j].key or tostring(j)) == query.before then | |
180 found = true; | |
181 i = j; | |
182 break; | |
183 end | |
184 end | 191 end |
185 if not found then | 192 return found_after; |
186 return nil, "item-not-found"; | 193 end, iter); |
194 end | |
195 if query.before then | |
196 local found = false; | |
197 iter = it.filter(function(item) | |
198 if item.key == query.before then | |
199 found = true | |
187 end | 200 end |
188 end | 201 return not found; |
189 last_key = query.after; | 202 end, iter); |
190 elseif query.after then | 203 end |
191 local found = false; | 204 if query.limit then |
192 for j = 1, #items do | 205 iter = it.head(query.limit, iter); |
193 if (items[j].key or tostring(j)) == query.after then | 206 end |
194 found = true; | 207 end |
195 i = j; | 208 |
196 break; | 209 return function() |
197 end | 210 local item = iter(); |
198 end | 211 if item == nil then |
199 if not found then | 212 return |
200 return nil, "item-not-found"; | 213 end |
201 end | 214 local key = item.key; |
202 last_key = query.before; | 215 local when = item.when or item.attr and datetime.parse(item.attr.stamp); |
203 elseif query.before then | |
204 last_key = query.before; | |
205 end | |
206 if query.limit and #items - i > query.limit then | |
207 items[i+query.limit+1] = nil; | |
208 end | |
209 end | |
210 return function () | |
211 i = i + 1; | |
212 local item = items[i]; | |
213 if not item or (last_key and item.key == last_key) then | |
214 return; | |
215 end | |
216 local key = item.key or tostring(i); | |
217 local when = item.when or datetime.parse(item.attr.stamp); | |
218 local with = item.with; | 216 local with = item.with; |
219 item.key, item.when, item.with = nil, nil, nil; | 217 item.key, item.when, item.with = nil, nil, nil; |
220 item.attr.stamp = nil; | 218 item.attr.stamp = nil; |
221 -- COMPAT Stored data may still contain legacy XEP-0091 timestamp | 219 -- COMPAT Stored data may still contain legacy XEP-0091 timestamp |
222 item.attr.stamp_legacy = nil; | 220 item.attr.stamp_legacy = nil; |
223 item = st.deserialize(item); | 221 item = st.deserialize(item); |
224 return key, item, when, with; | 222 return key, item, when, with; |
225 end, count; | 223 end |
226 end | 224 end |
227 | 225 |
228 function archive:get(username, wanted_key) | 226 function archive:get(username, wanted_key) |
229 local iter, err = self:find(username, { key = wanted_key }) | 227 local iter, err = self:find(username, { key = wanted_key }) |
230 if not iter then return iter, err; end | 228 if not iter then return iter, err; end |