Software /
code /
prosody
Comparison
util/async.lua @ 8600:96f20cf92b84
util.async: Add per-runner ids and add runner:log() method
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Fri, 16 Mar 2018 14:59:41 +0000 |
parent | 8408:b751bee6a829 |
child | 8601:ce5b16e13573 |
comparison
equal
deleted
inserted
replaced
8599:62bfc85a53c8 | 8600:96f20cf92b84 |
---|---|
1 local log = require "util.logger".init("util.async"); | 1 local log = require "util.logger".init("util.async"); |
2 local new_id = require "util.id".short; | |
2 | 3 |
3 local function checkthread() | 4 local function checkthread() |
4 local thread, main = coroutine.running(); | 5 local thread, main = coroutine.running(); |
5 if not thread or main then | 6 if not thread or main then |
6 error("Not running in an async context, see https://prosody.im/doc/developers/util/async"); | 7 error("Not running in an async context, see https://prosody.im/doc/developers/util/async"); |
97 end | 98 end |
98 | 99 |
99 local empty_watchers = {}; | 100 local empty_watchers = {}; |
100 local function runner(func, watchers, data) | 101 local function runner(func, watchers, data) |
101 return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready", | 102 return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready", |
102 queue = {}, watchers = watchers or empty_watchers, data = data } | 103 queue = {}, watchers = watchers or empty_watchers, data = data, id = new_id() } |
103 , runner_mt); | 104 , runner_mt); |
104 end | 105 end |
105 | 106 |
106 -- Add a task item for the runner to process | 107 -- Add a task item for the runner to process |
107 function runner_mt:run(input) | 108 function runner_mt:run(input) |
108 if input ~= nil then | 109 if input ~= nil then |
110 self:log("debug", "queued new work item, %d items queued", #self.queue); | |
109 table.insert(self.queue, input); | 111 table.insert(self.queue, input); |
110 end | 112 end |
111 if self.state ~= "ready" then | 113 if self.state ~= "ready" then |
112 -- The runner is busy. Indicate that the task item has been | 114 -- The runner is busy. Indicate that the task item has been |
113 -- queued, and return information about the current runner state | 115 -- queued, and return information about the current runner state |
114 return true, self.state, #self.queue; | 116 return true, self.state, #self.queue; |
115 end | 117 end |
116 | 118 |
117 local q, thread = self.queue, self.thread; | 119 local q, thread = self.queue, self.thread; |
118 if not thread or coroutine.status(thread) == "dead" then | 120 if not thread or coroutine.status(thread) == "dead" then |
121 self:log("debug", "creating new coroutine"); | |
119 -- Create a new coroutine for this runner | 122 -- Create a new coroutine for this runner |
120 thread = runner_create_thread(self.func, self); | 123 thread = runner_create_thread(self.func, self); |
121 self.thread = thread; | 124 self.thread = thread; |
122 end | 125 end |
123 | 126 |
124 -- Process task item(s) while the queue is not empty, and we're not blocked | 127 -- Process task item(s) while the queue is not empty, and we're not blocked |
125 local n, state, err = #q, self.state, nil; | 128 local n, state, err = #q, self.state, nil; |
126 self.state = "running"; | 129 self.state = "running"; |
130 self:log("debug", "running main loop"); | |
127 while n > 0 and state == "ready" do | 131 while n > 0 and state == "ready" do |
128 local consumed; | 132 local consumed; |
129 -- Loop through queue items, and attempt to run them | 133 -- Loop through queue items, and attempt to run them |
130 for i = 1,n do | 134 for i = 1,n do |
131 local queued_input = q[i]; | 135 local queued_input = q[i]; |
154 n = #q; | 158 n = #q; |
155 end | 159 end |
156 -- Runner processed all items it can, so save current runner state | 160 -- Runner processed all items it can, so save current runner state |
157 self.state = state; | 161 self.state = state; |
158 if err or state ~= self.notified_state then | 162 if err or state ~= self.notified_state then |
163 self:log("debug", "changed state from %s to %s", self.notified_state, err and "error" or state); | |
159 if err then | 164 if err then |
160 state = "error" | 165 state = "error" |
161 else | 166 else |
162 self.notified_state = state; | 167 self.notified_state = state; |
163 end | 168 end |
170 -- Add a task item to the queue without invoking the runner, even if it is idle | 175 -- Add a task item to the queue without invoking the runner, even if it is idle |
171 function runner_mt:enqueue(input) | 176 function runner_mt:enqueue(input) |
172 table.insert(self.queue, input); | 177 table.insert(self.queue, input); |
173 end | 178 end |
174 | 179 |
180 function runner_mt:log(level, fmt, ...) | |
181 return log(level, "[runner %s] "..fmt, self.id, ...); | |
182 end | |
183 | |
175 return { waiter = waiter, guarder = guarder, runner = runner }; | 184 return { waiter = waiter, guarder = guarder, runner = runner }; |