Software /
code /
prosody
Comparison
util/datamanager.lua @ 13134:638f627e707f
util.datamanager: Add O(1) list indexing with on-disk index
Index file contains offsets and lengths of each item() which allows
seeking directly to each item and reading it without parsing the entire
file.
Also allows tricks like binary search, assuming items have some defined
order.
We take advantage of the 1-based indexing in tables to store a magic
header in the 0 position, so that table index 1 ends up at file index 1.
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Tue, 11 May 2021 02:09:56 +0200 |
parent | 12975:d10957394a3c |
child | 13137:b417a49cc31b |
comparison
equal
deleted
inserted
replaced
13133:3692265becb7 | 13134:638f627e707f |
---|---|
5 -- This project is MIT/X11 licensed. Please see the | 5 -- This project is MIT/X11 licensed. Please see the |
6 -- COPYING file in the source package for more information. | 6 -- COPYING file in the source package for more information. |
7 -- | 7 -- |
8 | 8 |
9 | 9 |
10 local string = string; | |
10 local format = string.format; | 11 local format = string.format; |
11 local setmetatable = setmetatable; | 12 local setmetatable = setmetatable; |
12 local ipairs = ipairs; | 13 local ipairs = ipairs; |
13 local char = string.char; | 14 local char = string.char; |
14 local pcall = pcall; | 15 local pcall = pcall; |
15 local log = require "prosody.util.logger".init("datamanager"); | 16 local log = require "prosody.util.logger".init("datamanager"); |
16 local io_open = io.open; | 17 local io_open = io.open; |
17 local os_remove = os.remove; | 18 local os_remove = os.remove; |
18 local os_rename = os.rename; | 19 local os_rename = os.rename; |
19 local tonumber = tonumber; | 20 local tonumber = tonumber; |
21 local floor = math.floor; | |
20 local next = next; | 22 local next = next; |
21 local type = type; | 23 local type = type; |
22 local t_insert = table.insert; | 24 local t_insert = table.insert; |
23 local t_concat = table.concat; | 25 local t_concat = table.concat; |
24 local envloadfile = require"prosody.util.envload".envloadfile; | 26 local envloadfile = require"prosody.util.envload".envloadfile; |
27 local envload = require"prosody.util.envload".envload; | |
25 local serialize = require "prosody.util.serialization".serialize; | 28 local serialize = require "prosody.util.serialization".serialize; |
26 local lfs = require "lfs"; | 29 local lfs = require "lfs"; |
27 -- Extract directory separator from package.config (an undocumented string that comes with lua) | 30 -- Extract directory separator from package.config (an undocumented string that comes with lua) |
28 local path_separator = assert ( package.config:match ( "^([^\n]+)" ) , "package.config not in standard form" ) | 31 local path_separator = assert ( package.config:match ( "^([^\n]+)" ) , "package.config not in standard form" ) |
29 | 32 |
253 end | 256 end |
254 | 257 |
255 return true, pos; | 258 return true, pos; |
256 end | 259 end |
257 | 260 |
261 local index_fmt, index_item_size, index_magic; | |
262 if string.packsize then | |
263 index_fmt = "TT"; -- struct { size_t start, size_t length } | |
264 index_item_size = string.packsize(index_fmt); | |
265 index_magic = string.pack(index_fmt, 7767639, 1); -- Magic string: T9 for "prosody", version number | |
266 end | |
267 | |
258 local function list_append(username, host, datastore, data) | 268 local function list_append(username, host, datastore, data) |
259 if not data then return; end | 269 if not data then return; end |
260 if callback(username, host, datastore) == false then return true; end | 270 if callback(username, host, datastore) == false then return true; end |
261 -- save the datastore | 271 -- save the datastore |
262 | 272 |
265 if not ok then | 275 if not ok then |
266 log("error", "Unable to write to %s storage ('%s' in %s) for user: %s@%s", | 276 log("error", "Unable to write to %s storage ('%s' in %s) for user: %s@%s", |
267 datastore, msg, where, username or "nil", host or "nil"); | 277 datastore, msg, where, username or "nil", host or "nil"); |
268 return ok, msg; | 278 return ok, msg; |
269 end | 279 end |
280 if string.packsize then | |
281 local offset = type(msg) == "number" and msg or 0; | |
282 local index_entry = string.pack(index_fmt, offset, #data); | |
283 if offset == 0 then | |
284 index_entry = index_magic .. index_entry; | |
285 end | |
286 local ok, off = append(username, host, datastore, "lidx", index_entry); | |
287 off = off or 0; | |
288 -- If this was the first item, then both the data and index offsets should | |
289 -- be zero, otherwise there's some kind of mismatch and we should drop the | |
290 -- index and recreate it from scratch | |
291 -- TODO Actually rebuild the index in this case? | |
292 if not ok or (off == 0 and offset ~= 0) or (off ~= 0 and offset == 0) then | |
293 os_remove(getpath(username, host, datastore, "lidx")); | |
294 end | |
295 end | |
270 return true; | 296 return true; |
271 end | 297 end |
272 | 298 |
273 local function list_store(username, host, datastore, data) | 299 local function list_store(username, host, datastore, data) |
274 if not data then | 300 if not data then |
278 -- save the datastore | 304 -- save the datastore |
279 local d = {}; | 305 local d = {}; |
280 for i, item in ipairs(data) do | 306 for i, item in ipairs(data) do |
281 d[i] = "item(" .. serialize(item) .. ");\n"; | 307 d[i] = "item(" .. serialize(item) .. ");\n"; |
282 end | 308 end |
309 os_remove(getpath(username, host, datastore, "lidx")); | |
283 local ok, msg = atomic_store(getpath(username, host, datastore, "list", true), t_concat(d)); | 310 local ok, msg = atomic_store(getpath(username, host, datastore, "list", true), t_concat(d)); |
284 if not ok then | 311 if not ok then |
285 log("error", "Unable to write to %s storage ('%s') for user: %s@%s", datastore, msg, username or "nil", host or "nil"); | 312 log("error", "Unable to write to %s storage ('%s') for user: %s@%s", datastore, msg, username or "nil", host or "nil"); |
286 return; | 313 return; |
287 end | 314 end |
291 end | 318 end |
292 -- we write data even when we are deleting because lua doesn't have a | 319 -- we write data even when we are deleting because lua doesn't have a |
293 -- platform independent way of checking for nonexisting files | 320 -- platform independent way of checking for nonexisting files |
294 return true; | 321 return true; |
295 end | 322 end |
323 | |
324 local function build_list_index(username, host, datastore, items) | |
325 log("debug", "Building index for (%s@%s/%s)", username, host, datastore); | |
326 local filename = getpath(username, host, datastore, "list"); | |
327 local fh, err, errno = io_open(filename); | |
328 if not fh then | |
329 return fh, err, errno; | |
330 end | |
331 local prev_pos = 0; -- position before reading | |
332 local last_item_start = 0; | |
333 | |
334 if items and items[1] then | |
335 local last_item = items[#items]; | |
336 last_item_start = fh:seek("set", last_item.start + last_item.length); | |
337 else | |
338 items = {}; | |
339 end | |
340 | |
341 for line in fh:lines() do | |
342 if line:sub(1, 4) == "item" then | |
343 if prev_pos ~= 0 then | |
344 t_insert(items, { start = last_item_start; length = prev_pos - last_item_start }); | |
345 end | |
346 last_item_start = prev_pos | |
347 end | |
348 -- seek position is at the start of the next line within each loop iteration | |
349 -- so we need to collect the "current" position at the end of the previous | |
350 prev_pos = fh:seek() | |
351 end | |
352 if prev_pos ~= 0 then | |
353 t_insert(items, { start = last_item_start; length = prev_pos - last_item_start }); | |
354 end | |
355 return items; | |
356 end | |
357 | |
358 local function store_list_index(username, host, datastore, index) | |
359 local data = { index_magic }; | |
360 for i, v in ipairs(index) do | |
361 data[i + 1] = string.pack(index_fmt, v.start, v.length); | |
362 end | |
363 local filename = getpath(username, host, datastore, "lidx"); | |
364 return atomic_store(filename, t_concat(data)); | |
365 end | |
366 | |
367 local index_mt = { | |
368 __index = function(t, i) | |
369 if type(i) ~= "number" or i % 1 ~= 0 or i < 1 then | |
370 return | |
371 end | |
372 if i < 0 then | |
373 return | |
374 end | |
375 local fh = t.file; | |
376 local pos = i * index_item_size; | |
377 if fh:seek("set", pos) ~= pos then | |
378 return nil | |
379 end | |
380 local data = fh:read(index_item_size); | |
381 if not data then | |
382 return nil | |
383 end | |
384 local start, length = string.unpack(index_fmt, data); | |
385 local v = { start = start; length = length }; | |
386 t[i] = v; | |
387 return v; | |
388 end; | |
389 __len = function(t) | |
390 -- Account for both the header and the fence post error | |
391 return floor(t.file:seek("end") / index_item_size) - 1; | |
392 end; | |
393 } | |
394 | |
395 local function get_list_index(username, host, datastore) | |
396 log("debug", "Loading index for (%s%s/%s)", username, host, datastore); | |
397 local index_filename = getpath(username, host, datastore, "lidx"); | |
398 local ih = io_open(index_filename); | |
399 if ih then | |
400 local magic = ih:read(#index_magic); | |
401 if magic ~= index_magic then | |
402 log("warn", "Index %q has wrong version number (got %q, expected %q)", index_filename, magic, index_magic); | |
403 -- wrong version or something | |
404 ih:close(); | |
405 ih = nil; | |
406 end | |
407 end | |
408 | |
409 if ih then | |
410 return setmetatable({ file = ih }, index_mt); | |
411 end | |
412 | |
413 local index, err = build_list_index(username, host, datastore); | |
414 if not index then | |
415 return index, err | |
416 end | |
417 | |
418 -- TODO How to handle failure to store the index? | |
419 local dontcare = store_list_index(username, host, datastore, index); -- luacheck: ignore 211/dontcare | |
420 return index; | |
421 end | |
422 | |
423 local function list_load_one(fh, start, length) | |
424 if fh:seek("set", start) ~= start then | |
425 return nil | |
426 end | |
427 local raw_data = fh:read(length) | |
428 if not raw_data or #raw_data ~= length then | |
429 return | |
430 end | |
431 local item; | |
432 local data, err, errno = envload(raw_data, "@list", { | |
433 item = function(i) | |
434 item = i; | |
435 end; | |
436 }); | |
437 if not data then | |
438 return data, err, errno | |
439 end | |
440 local success, ret = pcall(data); | |
441 if not success then | |
442 return success, ret; | |
443 end | |
444 return item; | |
445 end | |
446 | |
447 local indexed_list_mt = { | |
448 __index = function(t, i) | |
449 if type(i) ~= "number" or i % 1 ~= 0 or i < 1 then | |
450 return | |
451 end | |
452 local ix = t.index[i]; | |
453 if not ix then | |
454 return | |
455 end | |
456 local item = list_load_one(t.file, ix.start, ix.length); | |
457 return item; | |
458 end; | |
459 __len = function(t) | |
460 return #t.index; | |
461 end; | |
462 } | |
296 | 463 |
297 local function list_load(username, host, datastore) | 464 local function list_load(username, host, datastore) |
298 local items = {}; | 465 local items = {}; |
299 local data, err, errno = envloadfile(getpath(username, host, datastore, "list"), {item = function(i) t_insert(items, i); end}); | 466 local data, err, errno = envloadfile(getpath(username, host, datastore, "list"), {item = function(i) t_insert(items, i); end}); |
300 if not data then | 467 if not data then |
310 if not success then | 477 if not success then |
311 log("error", "Unable to load %s storage ('%s') for user: %s@%s", datastore, ret, username or "nil", host or "nil"); | 478 log("error", "Unable to load %s storage ('%s') for user: %s@%s", datastore, ret, username or "nil", host or "nil"); |
312 return nil, "Error reading storage"; | 479 return nil, "Error reading storage"; |
313 end | 480 end |
314 return items; | 481 return items; |
482 end | |
483 | |
484 local function list_open(username, host, datastore) | |
485 if not index_magic then | |
486 log("warn", "Falling back from lazy loading to to loading full list for %s storage for user: %s@%s", datastore, username or "nil", host or "nil"); | |
487 return list_load(username, host, datastore); | |
488 end | |
489 local filename = getpath(username, host, datastore, "list"); | |
490 local file, err, errno = io_open(filename); | |
491 if not file then | |
492 if errno == ENOENT then | |
493 return nil; | |
494 end | |
495 return file, err, errno; | |
496 end | |
497 local index, err = get_list_index(username, host, datastore); | |
498 if not index then | |
499 file:close() | |
500 return index, err; | |
501 end | |
502 return setmetatable({ file = file; index = index }, indexed_list_mt); | |
315 end | 503 end |
316 | 504 |
317 local type_map = { | 505 local type_map = { |
318 keyval = "dat"; | 506 keyval = "dat"; |
319 list = "list"; | 507 list = "list"; |
412 users = users; | 600 users = users; |
413 stores = stores; | 601 stores = stores; |
414 purge = purge; | 602 purge = purge; |
415 path_decode = decode; | 603 path_decode = decode; |
416 path_encode = encode; | 604 path_encode = encode; |
605 | |
606 build_list_index = build_list_index; | |
607 list_open = list_open; | |
417 }; | 608 }; |