Comparison

plugins/mod_cron.lua @ 13265:6ac5ad578565

mod_cron: Load last task run time inside task runner to fix async This ensures that all interactions with storage happen inside an async thread, allowing async waiting to be performed in storage drivers.
author Kim Alvefur <zash@zash.se>
date Sat, 14 Oct 2023 22:32:33 +0200
parent 13264:9b720c38fee8
child 13269:d50bee584969
comparison
equal deleted inserted replaced
13264:9b720c38fee8 13265:6ac5ad578565
10 function module.add_host(host_module) 10 function module.add_host(host_module)
11 11
12 local last_run_times = host_module:open_store("cron", "map"); 12 local last_run_times = host_module:open_store("cron", "map");
13 active_hosts[host_module.host] = true; 13 active_hosts[host_module.host] = true;
14 14
15 local function save_task(task, started_at) last_run_times:set(nil, task.id, started_at); end 15 local function save_task(task, started_at)
16 last_run_times:set(nil, task.id, started_at);
17 end
18
19 local function restore_task(task)
20 if task.last == nil then
21 task.last = last_run_times:get(nil, task.id);
22 end
23 end
16 24
17 local function task_added(event) 25 local function task_added(event)
18 local task = event.item; 26 local task = event.item;
19 if task.name == nil then task.name = task.when; end 27 if task.name == nil then
20 if task.id == nil then task.id = event.source.name .. "/" .. task.name:gsub("%W", "_"):lower(); end 28 task.name = task.when;
21 if task.last == nil then task.last = last_run_times:get(nil, task.id); end 29 end
30 if task.id == nil then
31 task.id = event.source.name .. "/" .. task.name:gsub("%W", "_"):lower();
32 end
33 task.restore = restore_task;
22 task.save = save_task; 34 task.save = save_task;
23 module:log("debug", "%s task %s added, last run %s", task.when, task.id, 35 module:log("debug", "%s task %s added", task.when, task.id);
24 task.last and datetime.datetime(task.last) or "never");
25 return true 36 return true
26 end 37 end
27 38
28 local function task_removed(event) 39 local function task_removed(event)
29 local task = event.item; 40 local task = event.item;
31 return true 42 return true
32 end 43 end
33 44
34 host_module:handle_items("task", task_added, task_removed, true); 45 host_module:handle_items("task", task_added, task_removed, true);
35 46
36 function host_module.unload() active_hosts[host_module.host] = nil; end 47 function host_module.unload()
48 active_hosts[host_module.host] = nil;
49 end
37 end 50 end
38 51
39 local function should_run(when, last) return not last or last + periods[when] * 0.995 <= os.time() end 52 local function should_run(when, last)
53 return not last or last + periods[when] * 0.995 <= os.time()
54 end
40 55
41 local function run_task(task) 56 local function run_task(task)
57 task:restore();
58 if not should_run(task.when, task.last) then
59 return
60 end
42 local started_at = os.time(); 61 local started_at = os.time();
43 task:run(started_at); 62 task:run(started_at);
44 task.last = started_at; 63 task.last = started_at;
45 task:save(started_at); 64 task:save(started_at);
46 end 65 end
50 module:log("info", "Running periodic tasks"); 69 module:log("info", "Running periodic tasks");
51 local delay = 3600; 70 local delay = 3600;
52 for host in pairs(active_hosts) do 71 for host in pairs(active_hosts) do
53 module:log("debug", "Running periodic tasks for host %s", host); 72 module:log("debug", "Running periodic tasks for host %s", host);
54 for _, task in ipairs(module:context(host):get_host_items("task")) do 73 for _, task in ipairs(module:context(host):get_host_items("task")) do
55 module:log("debug", "Considering %s task %s (%s)", task.when, task.id, task.run); 74 task_runner:run(task);
56 if should_run(task.when, task.last) then task_runner:run(task); end
57 end 75 end
58 end 76 end
59 module:log("debug", "Wait %ds", delay); 77 module:log("debug", "Wait %ds", delay);
60 return delay 78 return delay
61 end); 79 end);