Comparison

util/datamanager.lua @ 13134:638f627e707f

util.datamanager: Add O(1) list indexing with on-disk index Index file contains offsets and lengths of each item() which allows seeking directly to each item and reading it without parsing the entire file. Also allows tricks like binary search, assuming items have some defined order. We take advantage of the 1-based indexing in tables to store a magic header in the 0 position, so that table index 1 ends up at file index 1.
author Kim Alvefur <zash@zash.se>
date Tue, 11 May 2021 02:09:56 +0200
parent 12975:d10957394a3c
child 13137:b417a49cc31b
comparison
equal deleted inserted replaced
13133:3692265becb7 13134:638f627e707f
5 -- This project is MIT/X11 licensed. Please see the 5 -- This project is MIT/X11 licensed. Please see the
6 -- COPYING file in the source package for more information. 6 -- COPYING file in the source package for more information.
7 -- 7 --
8 8
9 9
10 local string = string;
10 local format = string.format; 11 local format = string.format;
11 local setmetatable = setmetatable; 12 local setmetatable = setmetatable;
12 local ipairs = ipairs; 13 local ipairs = ipairs;
13 local char = string.char; 14 local char = string.char;
14 local pcall = pcall; 15 local pcall = pcall;
15 local log = require "prosody.util.logger".init("datamanager"); 16 local log = require "prosody.util.logger".init("datamanager");
16 local io_open = io.open; 17 local io_open = io.open;
17 local os_remove = os.remove; 18 local os_remove = os.remove;
18 local os_rename = os.rename; 19 local os_rename = os.rename;
19 local tonumber = tonumber; 20 local tonumber = tonumber;
21 local floor = math.floor;
20 local next = next; 22 local next = next;
21 local type = type; 23 local type = type;
22 local t_insert = table.insert; 24 local t_insert = table.insert;
23 local t_concat = table.concat; 25 local t_concat = table.concat;
24 local envloadfile = require"prosody.util.envload".envloadfile; 26 local envloadfile = require"prosody.util.envload".envloadfile;
27 local envload = require"prosody.util.envload".envload;
25 local serialize = require "prosody.util.serialization".serialize; 28 local serialize = require "prosody.util.serialization".serialize;
26 local lfs = require "lfs"; 29 local lfs = require "lfs";
27 -- Extract directory separator from package.config (an undocumented string that comes with lua) 30 -- Extract directory separator from package.config (an undocumented string that comes with lua)
28 local path_separator = assert ( package.config:match ( "^([^\n]+)" ) , "package.config not in standard form" ) 31 local path_separator = assert ( package.config:match ( "^([^\n]+)" ) , "package.config not in standard form" )
29 32
253 end 256 end
254 257
255 return true, pos; 258 return true, pos;
256 end 259 end
257 260
261 local index_fmt, index_item_size, index_magic;
262 if string.packsize then
263 index_fmt = "TT"; -- struct { size_t start, size_t length }
264 index_item_size = string.packsize(index_fmt);
265 index_magic = string.pack(index_fmt, 7767639, 1); -- Magic string: T9 for "prosody", version number
266 end
267
258 local function list_append(username, host, datastore, data) 268 local function list_append(username, host, datastore, data)
259 if not data then return; end 269 if not data then return; end
260 if callback(username, host, datastore) == false then return true; end 270 if callback(username, host, datastore) == false then return true; end
261 -- save the datastore 271 -- save the datastore
262 272
265 if not ok then 275 if not ok then
266 log("error", "Unable to write to %s storage ('%s' in %s) for user: %s@%s", 276 log("error", "Unable to write to %s storage ('%s' in %s) for user: %s@%s",
267 datastore, msg, where, username or "nil", host or "nil"); 277 datastore, msg, where, username or "nil", host or "nil");
268 return ok, msg; 278 return ok, msg;
269 end 279 end
280 if string.packsize then
281 local offset = type(msg) == "number" and msg or 0;
282 local index_entry = string.pack(index_fmt, offset, #data);
283 if offset == 0 then
284 index_entry = index_magic .. index_entry;
285 end
286 local ok, off = append(username, host, datastore, "lidx", index_entry);
287 off = off or 0;
288 -- If this was the first item, then both the data and index offsets should
289 -- be zero, otherwise there's some kind of mismatch and we should drop the
290 -- index and recreate it from scratch
291 -- TODO Actually rebuild the index in this case?
292 if not ok or (off == 0 and offset ~= 0) or (off ~= 0 and offset == 0) then
293 os_remove(getpath(username, host, datastore, "lidx"));
294 end
295 end
270 return true; 296 return true;
271 end 297 end
272 298
273 local function list_store(username, host, datastore, data) 299 local function list_store(username, host, datastore, data)
274 if not data then 300 if not data then
278 -- save the datastore 304 -- save the datastore
279 local d = {}; 305 local d = {};
280 for i, item in ipairs(data) do 306 for i, item in ipairs(data) do
281 d[i] = "item(" .. serialize(item) .. ");\n"; 307 d[i] = "item(" .. serialize(item) .. ");\n";
282 end 308 end
309 os_remove(getpath(username, host, datastore, "lidx"));
283 local ok, msg = atomic_store(getpath(username, host, datastore, "list", true), t_concat(d)); 310 local ok, msg = atomic_store(getpath(username, host, datastore, "list", true), t_concat(d));
284 if not ok then 311 if not ok then
285 log("error", "Unable to write to %s storage ('%s') for user: %s@%s", datastore, msg, username or "nil", host or "nil"); 312 log("error", "Unable to write to %s storage ('%s') for user: %s@%s", datastore, msg, username or "nil", host or "nil");
286 return; 313 return;
287 end 314 end
291 end 318 end
292 -- we write data even when we are deleting because lua doesn't have a 319 -- we write data even when we are deleting because lua doesn't have a
293 -- platform independent way of checking for nonexisting files 320 -- platform independent way of checking for nonexisting files
294 return true; 321 return true;
295 end 322 end
323
324 local function build_list_index(username, host, datastore, items)
325 log("debug", "Building index for (%s@%s/%s)", username, host, datastore);
326 local filename = getpath(username, host, datastore, "list");
327 local fh, err, errno = io_open(filename);
328 if not fh then
329 return fh, err, errno;
330 end
331 local prev_pos = 0; -- position before reading
332 local last_item_start = 0;
333
334 if items and items[1] then
335 local last_item = items[#items];
336 last_item_start = fh:seek("set", last_item.start + last_item.length);
337 else
338 items = {};
339 end
340
341 for line in fh:lines() do
342 if line:sub(1, 4) == "item" then
343 if prev_pos ~= 0 then
344 t_insert(items, { start = last_item_start; length = prev_pos - last_item_start });
345 end
346 last_item_start = prev_pos
347 end
348 -- seek position is at the start of the next line within each loop iteration
349 -- so we need to collect the "current" position at the end of the previous
350 prev_pos = fh:seek()
351 end
352 if prev_pos ~= 0 then
353 t_insert(items, { start = last_item_start; length = prev_pos - last_item_start });
354 end
355 return items;
356 end
357
358 local function store_list_index(username, host, datastore, index)
359 local data = { index_magic };
360 for i, v in ipairs(index) do
361 data[i + 1] = string.pack(index_fmt, v.start, v.length);
362 end
363 local filename = getpath(username, host, datastore, "lidx");
364 return atomic_store(filename, t_concat(data));
365 end
366
367 local index_mt = {
368 __index = function(t, i)
369 if type(i) ~= "number" or i % 1 ~= 0 or i < 1 then
370 return
371 end
372 if i < 0 then
373 return
374 end
375 local fh = t.file;
376 local pos = i * index_item_size;
377 if fh:seek("set", pos) ~= pos then
378 return nil
379 end
380 local data = fh:read(index_item_size);
381 if not data then
382 return nil
383 end
384 local start, length = string.unpack(index_fmt, data);
385 local v = { start = start; length = length };
386 t[i] = v;
387 return v;
388 end;
389 __len = function(t)
390 -- Account for both the header and the fence post error
391 return floor(t.file:seek("end") / index_item_size) - 1;
392 end;
393 }
394
395 local function get_list_index(username, host, datastore)
396 log("debug", "Loading index for (%s%s/%s)", username, host, datastore);
397 local index_filename = getpath(username, host, datastore, "lidx");
398 local ih = io_open(index_filename);
399 if ih then
400 local magic = ih:read(#index_magic);
401 if magic ~= index_magic then
402 log("warn", "Index %q has wrong version number (got %q, expected %q)", index_filename, magic, index_magic);
403 -- wrong version or something
404 ih:close();
405 ih = nil;
406 end
407 end
408
409 if ih then
410 return setmetatable({ file = ih }, index_mt);
411 end
412
413 local index, err = build_list_index(username, host, datastore);
414 if not index then
415 return index, err
416 end
417
418 -- TODO How to handle failure to store the index?
419 local dontcare = store_list_index(username, host, datastore, index); -- luacheck: ignore 211/dontcare
420 return index;
421 end
422
423 local function list_load_one(fh, start, length)
424 if fh:seek("set", start) ~= start then
425 return nil
426 end
427 local raw_data = fh:read(length)
428 if not raw_data or #raw_data ~= length then
429 return
430 end
431 local item;
432 local data, err, errno = envload(raw_data, "@list", {
433 item = function(i)
434 item = i;
435 end;
436 });
437 if not data then
438 return data, err, errno
439 end
440 local success, ret = pcall(data);
441 if not success then
442 return success, ret;
443 end
444 return item;
445 end
446
447 local indexed_list_mt = {
448 __index = function(t, i)
449 if type(i) ~= "number" or i % 1 ~= 0 or i < 1 then
450 return
451 end
452 local ix = t.index[i];
453 if not ix then
454 return
455 end
456 local item = list_load_one(t.file, ix.start, ix.length);
457 return item;
458 end;
459 __len = function(t)
460 return #t.index;
461 end;
462 }
296 463
297 local function list_load(username, host, datastore) 464 local function list_load(username, host, datastore)
298 local items = {}; 465 local items = {};
299 local data, err, errno = envloadfile(getpath(username, host, datastore, "list"), {item = function(i) t_insert(items, i); end}); 466 local data, err, errno = envloadfile(getpath(username, host, datastore, "list"), {item = function(i) t_insert(items, i); end});
300 if not data then 467 if not data then
310 if not success then 477 if not success then
311 log("error", "Unable to load %s storage ('%s') for user: %s@%s", datastore, ret, username or "nil", host or "nil"); 478 log("error", "Unable to load %s storage ('%s') for user: %s@%s", datastore, ret, username or "nil", host or "nil");
312 return nil, "Error reading storage"; 479 return nil, "Error reading storage";
313 end 480 end
314 return items; 481 return items;
482 end
483
484 local function list_open(username, host, datastore)
485 if not index_magic then
486 log("warn", "Falling back from lazy loading to to loading full list for %s storage for user: %s@%s", datastore, username or "nil", host or "nil");
487 return list_load(username, host, datastore);
488 end
489 local filename = getpath(username, host, datastore, "list");
490 local file, err, errno = io_open(filename);
491 if not file then
492 if errno == ENOENT then
493 return nil;
494 end
495 return file, err, errno;
496 end
497 local index, err = get_list_index(username, host, datastore);
498 if not index then
499 file:close()
500 return index, err;
501 end
502 return setmetatable({ file = file; index = index }, indexed_list_mt);
315 end 503 end
316 504
317 local type_map = { 505 local type_map = {
318 keyval = "dat"; 506 keyval = "dat";
319 list = "list"; 507 list = "list";
412 users = users; 600 users = users;
413 stores = stores; 601 stores = stores;
414 purge = purge; 602 purge = purge;
415 path_decode = decode; 603 path_decode = decode;
416 path_encode = encode; 604 path_encode = encode;
605
606 build_list_index = build_list_index;
607 list_open = list_open;
417 }; 608 };