Comparison

util/async.lua @ 8600:96f20cf92b84

util.async: Add per-runner ids and add runner:log() method
author Matthew Wild <mwild1@gmail.com>
date Fri, 16 Mar 2018 14:59:41 +0000
parent 8408:b751bee6a829
child 8601:ce5b16e13573
comparison
equal deleted inserted replaced
8599:62bfc85a53c8 8600:96f20cf92b84
1 local log = require "util.logger".init("util.async"); 1 local log = require "util.logger".init("util.async");
2 local new_id = require "util.id".short;
2 3
3 local function checkthread() 4 local function checkthread()
4 local thread, main = coroutine.running(); 5 local thread, main = coroutine.running();
5 if not thread or main then 6 if not thread or main then
6 error("Not running in an async context, see https://prosody.im/doc/developers/util/async"); 7 error("Not running in an async context, see https://prosody.im/doc/developers/util/async");
97 end 98 end
98 99
99 local empty_watchers = {}; 100 local empty_watchers = {};
100 local function runner(func, watchers, data) 101 local function runner(func, watchers, data)
101 return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready", 102 return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready",
102 queue = {}, watchers = watchers or empty_watchers, data = data } 103 queue = {}, watchers = watchers or empty_watchers, data = data, id = new_id() }
103 , runner_mt); 104 , runner_mt);
104 end 105 end
105 106
106 -- Add a task item for the runner to process 107 -- Add a task item for the runner to process
107 function runner_mt:run(input) 108 function runner_mt:run(input)
108 if input ~= nil then 109 if input ~= nil then
110 self:log("debug", "queued new work item, %d items queued", #self.queue);
109 table.insert(self.queue, input); 111 table.insert(self.queue, input);
110 end 112 end
111 if self.state ~= "ready" then 113 if self.state ~= "ready" then
112 -- The runner is busy. Indicate that the task item has been 114 -- The runner is busy. Indicate that the task item has been
113 -- queued, and return information about the current runner state 115 -- queued, and return information about the current runner state
114 return true, self.state, #self.queue; 116 return true, self.state, #self.queue;
115 end 117 end
116 118
117 local q, thread = self.queue, self.thread; 119 local q, thread = self.queue, self.thread;
118 if not thread or coroutine.status(thread) == "dead" then 120 if not thread or coroutine.status(thread) == "dead" then
121 self:log("debug", "creating new coroutine");
119 -- Create a new coroutine for this runner 122 -- Create a new coroutine for this runner
120 thread = runner_create_thread(self.func, self); 123 thread = runner_create_thread(self.func, self);
121 self.thread = thread; 124 self.thread = thread;
122 end 125 end
123 126
124 -- Process task item(s) while the queue is not empty, and we're not blocked 127 -- Process task item(s) while the queue is not empty, and we're not blocked
125 local n, state, err = #q, self.state, nil; 128 local n, state, err = #q, self.state, nil;
126 self.state = "running"; 129 self.state = "running";
130 self:log("debug", "running main loop");
127 while n > 0 and state == "ready" do 131 while n > 0 and state == "ready" do
128 local consumed; 132 local consumed;
129 -- Loop through queue items, and attempt to run them 133 -- Loop through queue items, and attempt to run them
130 for i = 1,n do 134 for i = 1,n do
131 local queued_input = q[i]; 135 local queued_input = q[i];
154 n = #q; 158 n = #q;
155 end 159 end
156 -- Runner processed all items it can, so save current runner state 160 -- Runner processed all items it can, so save current runner state
157 self.state = state; 161 self.state = state;
158 if err or state ~= self.notified_state then 162 if err or state ~= self.notified_state then
163 self:log("debug", "changed state from %s to %s", self.notified_state, err and "error" or state);
159 if err then 164 if err then
160 state = "error" 165 state = "error"
161 else 166 else
162 self.notified_state = state; 167 self.notified_state = state;
163 end 168 end
170 -- Add a task item to the queue without invoking the runner, even if it is idle 175 -- Add a task item to the queue without invoking the runner, even if it is idle
171 function runner_mt:enqueue(input) 176 function runner_mt:enqueue(input)
172 table.insert(self.queue, input); 177 table.insert(self.queue, input);
173 end 178 end
174 179
180 function runner_mt:log(level, fmt, ...)
181 return log(level, "[runner %s] "..fmt, self.id, ...);
182 end
183
175 return { waiter = waiter, guarder = guarder, runner = runner }; 184 return { waiter = waiter, guarder = guarder, runner = runner };