Software /
code /
prosody
Comparison
util/async.lua @ 7436:649b89b2c840
util.async: Add some more comments for clarity
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Sun, 22 May 2016 20:06:12 +0100 |
parent | 7359:a5a080c12c96 |
child | 7724:20a69ef5570c |
comparison
equal
deleted
inserted
replaced
7434:3b16f2802ef0 | 7436:649b89b2c840 |
---|---|
5 if coroutine.status(thread) ~= "suspended" then -- This should suffice | 5 if coroutine.status(thread) ~= "suspended" then -- This should suffice |
6 return false; | 6 return false; |
7 end | 7 end |
8 local ok, state, runner = coroutine.resume(thread); | 8 local ok, state, runner = coroutine.resume(thread); |
9 if not ok then | 9 if not ok then |
10 -- Running the coroutine failed, which means we have to find the runner manually, | |
11 -- in order to inform the error handler | |
10 local level = 0; | 12 local level = 0; |
11 while debug.getinfo(thread, level, "") do level = level + 1; end | 13 while debug.getinfo(thread, level, "") do level = level + 1; end |
12 ok, runner = debug.getlocal(thread, level-1, 1); | 14 ok, runner = debug.getlocal(thread, level-1, 1); |
13 local error_handler = runner.watchers.error; | 15 local error_handler = runner.watchers.error; |
14 if error_handler then error_handler(runner, debug.traceback(thread, state)); end | 16 if error_handler then error_handler(runner, debug.traceback(thread, state)); end |
97 return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready", | 99 return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready", |
98 queue = {}, watchers = watchers or empty_watchers, data = data } | 100 queue = {}, watchers = watchers or empty_watchers, data = data } |
99 , runner_mt); | 101 , runner_mt); |
100 end | 102 end |
101 | 103 |
104 -- Add a task item for the runner to process | |
102 function runner_mt:run(input) | 105 function runner_mt:run(input) |
103 if input ~= nil then | 106 if input ~= nil then |
104 table.insert(self.queue, input); | 107 table.insert(self.queue, input); |
105 end | 108 end |
106 if self.state ~= "ready" then | 109 if self.state ~= "ready" then |
110 -- The runner is busy. Indicate that the task item has been | |
111 -- queued, and return information about the current runner state | |
107 return true, self.state, #self.queue; | 112 return true, self.state, #self.queue; |
108 end | 113 end |
109 | 114 |
110 local q, thread = self.queue, self.thread; | 115 local q, thread = self.queue, self.thread; |
111 if not thread or coroutine.status(thread) == "dead" then | 116 if not thread or coroutine.status(thread) == "dead" then |
117 -- Create a new coroutine for this runner | |
112 thread = runner_create_thread(self.func, self); | 118 thread = runner_create_thread(self.func, self); |
113 self.thread = thread; | 119 self.thread = thread; |
114 end | 120 end |
115 | 121 |
122 -- Process task item(s) while the queue is not empty, and we're not blocked | |
116 local n, state, err = #q, self.state, nil; | 123 local n, state, err = #q, self.state, nil; |
117 self.state = "running"; | 124 self.state = "running"; |
118 while n > 0 and state == "ready" do | 125 while n > 0 and state == "ready" do |
119 local consumed; | 126 local consumed; |
127 -- Loop through queue items, and attempt to run them | |
120 for i = 1,n do | 128 for i = 1,n do |
121 local input = q[i]; | 129 local input = q[i]; |
122 local ok, new_state = coroutine.resume(thread, input); | 130 local ok, new_state = coroutine.resume(thread, input); |
123 if not ok then | 131 if not ok then |
132 -- There was an error running the coroutine, save the error, mark runner as ready to begin again | |
124 consumed, state, err = i, "ready", debug.traceback(thread, new_state); | 133 consumed, state, err = i, "ready", debug.traceback(thread, new_state); |
125 self.thread = nil; | 134 self.thread = nil; |
126 break; | 135 break; |
127 elseif new_state == "wait" then | 136 elseif new_state == "wait" then |
137 -- Runner is blocked on waiting for a task item to complete | |
128 consumed, state = i, "waiting"; | 138 consumed, state = i, "waiting"; |
129 break; | 139 break; |
130 end | 140 end |
131 end | 141 end |
142 -- Loop ended - either queue empty because all tasks passed without blocking (consumed == nil) | |
143 -- or runner is blocked/errored, and consumed will contain the number of tasks processed so far | |
132 if not consumed then consumed = n; end | 144 if not consumed then consumed = n; end |
145 -- Remove consumed items from the queue array | |
133 if q[n+1] ~= nil then | 146 if q[n+1] ~= nil then |
134 n = #q; | 147 n = #q; |
135 end | 148 end |
136 for i = 1, n do | 149 for i = 1, n do |
137 q[i] = q[consumed+i]; | 150 q[i] = q[consumed+i]; |
138 end | 151 end |
139 n = #q; | 152 n = #q; |
140 end | 153 end |
154 -- Runner processed all items it can, so save current runner state | |
141 self.state = state; | 155 self.state = state; |
142 if err or state ~= self.notified_state then | 156 if err or state ~= self.notified_state then |
143 if err then | 157 if err then |
144 state = "error" | 158 state = "error" |
145 else | 159 else |
149 if handler then handler(self, err); end | 163 if handler then handler(self, err); end |
150 end | 164 end |
151 return true, state, n; | 165 return true, state, n; |
152 end | 166 end |
153 | 167 |
168 -- Add a task item to the queue without invoking the runner, even if it is idle | |
154 function runner_mt:enqueue(input) | 169 function runner_mt:enqueue(input) |
155 table.insert(self.queue, input); | 170 table.insert(self.queue, input); |
156 end | 171 end |
157 | 172 |
158 return { waiter = waiter, guarder = guarder, runner = runner }; | 173 return { waiter = waiter, guarder = guarder, runner = runner }; |