Comparison

util/async.lua @ 7436:649b89b2c840

util.async: Add some more comments for clarity
author Matthew Wild <mwild1@gmail.com>
date Sun, 22 May 2016 20:06:12 +0100
parent 7359:a5a080c12c96
child 7724:20a69ef5570c
comparison
equal deleted inserted replaced
7434:3b16f2802ef0 7436:649b89b2c840
5 if coroutine.status(thread) ~= "suspended" then -- This should suffice 5 if coroutine.status(thread) ~= "suspended" then -- This should suffice
6 return false; 6 return false;
7 end 7 end
8 local ok, state, runner = coroutine.resume(thread); 8 local ok, state, runner = coroutine.resume(thread);
9 if not ok then 9 if not ok then
10 -- Running the coroutine failed, which means we have to find the runner manually,
11 -- in order to inform the error handler
10 local level = 0; 12 local level = 0;
11 while debug.getinfo(thread, level, "") do level = level + 1; end 13 while debug.getinfo(thread, level, "") do level = level + 1; end
12 ok, runner = debug.getlocal(thread, level-1, 1); 14 ok, runner = debug.getlocal(thread, level-1, 1);
13 local error_handler = runner.watchers.error; 15 local error_handler = runner.watchers.error;
14 if error_handler then error_handler(runner, debug.traceback(thread, state)); end 16 if error_handler then error_handler(runner, debug.traceback(thread, state)); end
97 return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready", 99 return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready",
98 queue = {}, watchers = watchers or empty_watchers, data = data } 100 queue = {}, watchers = watchers or empty_watchers, data = data }
99 , runner_mt); 101 , runner_mt);
100 end 102 end
101 103
104 -- Add a task item for the runner to process
102 function runner_mt:run(input) 105 function runner_mt:run(input)
103 if input ~= nil then 106 if input ~= nil then
104 table.insert(self.queue, input); 107 table.insert(self.queue, input);
105 end 108 end
106 if self.state ~= "ready" then 109 if self.state ~= "ready" then
110 -- The runner is busy. Indicate that the task item has been
111 -- queued, and return information about the current runner state
107 return true, self.state, #self.queue; 112 return true, self.state, #self.queue;
108 end 113 end
109 114
110 local q, thread = self.queue, self.thread; 115 local q, thread = self.queue, self.thread;
111 if not thread or coroutine.status(thread) == "dead" then 116 if not thread or coroutine.status(thread) == "dead" then
117 -- Create a new coroutine for this runner
112 thread = runner_create_thread(self.func, self); 118 thread = runner_create_thread(self.func, self);
113 self.thread = thread; 119 self.thread = thread;
114 end 120 end
115 121
122 -- Process task item(s) while the queue is not empty, and we're not blocked
116 local n, state, err = #q, self.state, nil; 123 local n, state, err = #q, self.state, nil;
117 self.state = "running"; 124 self.state = "running";
118 while n > 0 and state == "ready" do 125 while n > 0 and state == "ready" do
119 local consumed; 126 local consumed;
127 -- Loop through queue items, and attempt to run them
120 for i = 1,n do 128 for i = 1,n do
121 local input = q[i]; 129 local input = q[i];
122 local ok, new_state = coroutine.resume(thread, input); 130 local ok, new_state = coroutine.resume(thread, input);
123 if not ok then 131 if not ok then
132 -- There was an error running the coroutine, save the error, mark runner as ready to begin again
124 consumed, state, err = i, "ready", debug.traceback(thread, new_state); 133 consumed, state, err = i, "ready", debug.traceback(thread, new_state);
125 self.thread = nil; 134 self.thread = nil;
126 break; 135 break;
127 elseif new_state == "wait" then 136 elseif new_state == "wait" then
137 -- Runner is blocked on waiting for a task item to complete
128 consumed, state = i, "waiting"; 138 consumed, state = i, "waiting";
129 break; 139 break;
130 end 140 end
131 end 141 end
142 -- Loop ended - either queue empty because all tasks passed without blocking (consumed == nil)
143 -- or runner is blocked/errored, and consumed will contain the number of tasks processed so far
132 if not consumed then consumed = n; end 144 if not consumed then consumed = n; end
145 -- Remove consumed items from the queue array
133 if q[n+1] ~= nil then 146 if q[n+1] ~= nil then
134 n = #q; 147 n = #q;
135 end 148 end
136 for i = 1, n do 149 for i = 1, n do
137 q[i] = q[consumed+i]; 150 q[i] = q[consumed+i];
138 end 151 end
139 n = #q; 152 n = #q;
140 end 153 end
154 -- Runner processed all items it can, so save current runner state
141 self.state = state; 155 self.state = state;
142 if err or state ~= self.notified_state then 156 if err or state ~= self.notified_state then
143 if err then 157 if err then
144 state = "error" 158 state = "error"
145 else 159 else
149 if handler then handler(self, err); end 163 if handler then handler(self, err); end
150 end 164 end
151 return true, state, n; 165 return true, state, n;
152 end 166 end
153 167
168 -- Add a task item to the queue without invoking the runner, even if it is idle
154 function runner_mt:enqueue(input) 169 function runner_mt:enqueue(input)
155 table.insert(self.queue, input); 170 table.insert(self.queue, input);
156 end 171 end
157 172
158 return { waiter = waiter, guarder = guarder, runner = runner }; 173 return { waiter = waiter, guarder = guarder, runner = runner };