Software / code / prosody
Comparison
util/async.lua @ 6054:7a5ddbaf758d
Merge 0.9->0.10
| author | Matthew Wild <mwild1@gmail.com> |
|---|---|
| date | Wed, 02 Apr 2014 17:41:38 +0100 |
| parent | 5797:a493b79cfad0 |
| child | 7359:a5a080c12c96 |
comparison
equal
deleted
inserted
replaced
| 6053:2f93a04564b2 | 6054:7a5ddbaf758d |
|---|---|
| 1 local log = require "util.logger".init("util.async"); | |
| 2 | |
| 3 local function runner_continue(thread) | |
| 4 -- ASSUMPTION: runner is in 'waiting' state (but we don't have the runner to know for sure) | |
| 5 if coroutine.status(thread) ~= "suspended" then -- This should suffice | |
| 6 return false; | |
| 7 end | |
| 8 local ok, state, runner = coroutine.resume(thread); | |
| 9 if not ok then | |
| 10 local level = 0; | |
| 11 while debug.getinfo(thread, level, "") do level = level + 1; end | |
| 12 ok, runner = debug.getlocal(thread, level-1, 1); | |
| 13 local error_handler = runner.watchers.error; | |
| 14 if error_handler then error_handler(runner, debug.traceback(thread, state)); end | |
| 15 elseif state == "ready" then | |
| 16 -- If state is 'ready', it is our responsibility to update runner.state from 'waiting'. | |
| 17 -- We also have to :run(), because the queue might have further items that will not be | |
| 18 -- processed otherwise. FIXME: It's probably best to do this in a nexttick (0 timer). | |
| 19 runner.state = "ready"; | |
| 20 runner:run(); | |
| 21 end | |
| 22 return true; | |
| 23 end | |
| 24 | |
| 25 local function waiter(num) | |
| 26 local thread = coroutine.running(); | |
| 27 if not thread then | |
| 28 error("Not running in an async context, see http://prosody.im/doc/developers/async"); | |
| 29 end | |
| 30 num = num or 1; | |
| 31 local waiting; | |
| 32 return function () | |
| 33 if num == 0 then return; end -- already done | |
| 34 waiting = true; | |
| 35 coroutine.yield("wait"); | |
| 36 end, function () | |
| 37 num = num - 1; | |
| 38 if num == 0 and waiting then | |
| 39 runner_continue(thread); | |
| 40 elseif num < 0 then | |
| 41 error("done() called too many times"); | |
| 42 end | |
| 43 end; | |
| 44 end | |
| 45 | |
| 46 local function guarder() | |
| 47 local guards = {}; | |
| 48 return function (id, func) | |
| 49 local thread = coroutine.running(); | |
| 50 if not thread then | |
| 51 error("Not running in an async context, see http://prosody.im/doc/developers/async"); | |
| 52 end | |
| 53 local guard = guards[id]; | |
| 54 if not guard then | |
| 55 guard = {}; | |
| 56 guards[id] = guard; | |
| 57 log("debug", "New guard!"); | |
| 58 else | |
| 59 table.insert(guard, thread); | |
| 60 log("debug", "Guarded. %d threads waiting.", #guard) | |
| 61 coroutine.yield("wait"); | |
| 62 end | |
| 63 local function exit() | |
| 64 local next_waiting = table.remove(guard, 1); | |
| 65 if next_waiting then | |
| 66 log("debug", "guard: Executing next waiting thread (%d left)", #guard) | |
| 67 runner_continue(next_waiting); | |
| 68 else | |
| 69 log("debug", "Guard off duty.") | |
| 70 guards[id] = nil; | |
| 71 end | |
| 72 end | |
| 73 if func then | |
| 74 func(); | |
| 75 exit(); | |
| 76 return; | |
| 77 end | |
| 78 return exit; | |
| 79 end; | |
| 80 end | |
| 81 | |
| 82 local runner_mt = {}; | |
| 83 runner_mt.__index = runner_mt; | |
| 84 | |
| 85 local function runner_create_thread(func, self) | |
| 86 local thread = coroutine.create(function (self) | |
| 87 while true do | |
| 88 func(coroutine.yield("ready", self)); | |
| 89 end | |
| 90 end); | |
| 91 assert(coroutine.resume(thread, self)); -- Start it up, it will return instantly to wait for the first input | |
| 92 return thread; | |
| 93 end | |
| 94 | |
| 95 local empty_watchers = {}; | |
| 96 local function runner(func, watchers, data) | |
| 97 return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready", | |
| 98 queue = {}, watchers = watchers or empty_watchers, data = data } | |
| 99 , runner_mt); | |
| 100 end | |
| 101 | |
| 102 function runner_mt:run(input) | |
| 103 if input ~= nil then | |
| 104 table.insert(self.queue, input); | |
| 105 end | |
| 106 if self.state ~= "ready" then | |
| 107 return true, self.state, #self.queue; | |
| 108 end | |
| 109 | |
| 110 local q, thread = self.queue, self.thread; | |
| 111 if not thread or coroutine.status(thread) == "dead" then | |
| 112 thread = runner_create_thread(self.func, self); | |
| 113 self.thread = thread; | |
| 114 end | |
| 115 | |
| 116 local n, state, err = #q, self.state, nil; | |
| 117 self.state = "running"; | |
| 118 while n > 0 and state == "ready" do | |
| 119 local consumed; | |
| 120 for i = 1,n do | |
| 121 local input = q[i]; | |
| 122 local ok, new_state = coroutine.resume(thread, input); | |
| 123 if not ok then | |
| 124 consumed, state, err = i, "ready", debug.traceback(thread, new_state); | |
| 125 self.thread = nil; | |
| 126 break; | |
| 127 elseif new_state == "wait" then | |
| 128 consumed, state = i, "waiting"; | |
| 129 break; | |
| 130 end | |
| 131 end | |
| 132 if not consumed then consumed = n; end | |
| 133 if q[n+1] ~= nil then | |
| 134 n = #q; | |
| 135 end | |
| 136 for i = 1, n do | |
| 137 q[i] = q[consumed+i]; | |
| 138 end | |
| 139 n = #q; | |
| 140 end | |
| 141 self.state = state; | |
| 142 if err or state ~= self.notified_state then | |
| 143 if err then | |
| 144 state = "error" | |
| 145 else | |
| 146 self.notified_state = state; | |
| 147 end | |
| 148 local handler = self.watchers[state]; | |
| 149 if handler then handler(self, err); end | |
| 150 end | |
| 151 return true, state, n; | |
| 152 end | |
| 153 | |
| 154 function runner_mt:enqueue(input) | |
| 155 table.insert(self.queue, input); | |
| 156 end | |
| 157 | |
| 158 return { waiter = waiter, guarder = guarder, runner = runner }; |