Software /
code /
prosody
Comparison
util/async.lua @ 5788:3556f338caa3
util.async: New library to provide support around coroutine-based non-blocking functions
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Sun, 11 Aug 2013 14:46:07 +0100 |
child | 5790:959163e4d631 |
comparison
equal
deleted
inserted
replaced
5787:9a22586f67eb | 5788:3556f338caa3 |
---|---|
1 local log = require "util.logger".init("util.async"); | |
2 | |
3 local function runner_continue(thread) | |
4 -- ASSUMPTION: runner is in 'waiting' state (but we don't have the runner to know for sure) | |
5 if coroutine.status(thread) ~= "suspended" then -- This should suffice | |
6 return false; | |
7 end | |
8 local ok, state, runner = coroutine.resume(thread); | |
9 if not ok then | |
10 local level = 0; | |
11 while debug.getinfo(thread, level, "") do level = level + 1; end | |
12 ok, runner = debug.getlocal(thread, level-1, 1); | |
13 local error_handler = runner.watchers.error; | |
14 if error_handler then error_handler(runner, debug.traceback(thread, state)); end | |
15 elseif state == "ready" then | |
16 -- If state is 'ready', it is our responsibility to update runner.state from 'waiting'. | |
17 -- We also have to :run(), because the queue might have further items that will not be | |
18 -- processed otherwise. FIXME: It's probably best to do this in a nexttick (0 timer). | |
19 runner.state = "ready"; | |
20 runner:run(); | |
21 end | |
22 return true; | |
23 end | |
24 | |
25 function waiter(num) | |
26 local thread = coroutine.running(); | |
27 if not thread then | |
28 error("Not running in an async context, see http://prosody.im/doc/developers/async"); | |
29 end | |
30 num = num or 1; | |
31 return function () | |
32 coroutine.yield("wait"); | |
33 end, function () | |
34 num = num - 1; | |
35 if num == 0 then | |
36 if not runner_continue(thread) then | |
37 error("done() called without wait()!"); | |
38 end | |
39 end | |
40 end; | |
41 end | |
42 | |
43 local runner_mt = {}; | |
44 runner_mt.__index = runner_mt; | |
45 | |
46 local function runner_create_thread(func, self) | |
47 local thread = coroutine.create(function (self) | |
48 while true do | |
49 func(coroutine.yield("ready", self)); | |
50 end | |
51 end); | |
52 assert(coroutine.resume(thread, self)); -- Start it up, it will return instantly to wait for the first input | |
53 return thread; | |
54 end | |
55 | |
56 local empty_watchers = {}; | |
57 function runner(func, watchers, data) | |
58 return setmetatable({ func = func, thread = false, state = "ready", notified_state = "ready", | |
59 queue = {}, watchers = watchers or empty_watchers, data = data } | |
60 , runner_mt); | |
61 end | |
62 | |
63 function runner_mt:run(input) | |
64 if input ~= nil then | |
65 table.insert(self.queue, input); | |
66 end | |
67 if self.state ~= "ready" then | |
68 return true, self.state, #self.queue; | |
69 end | |
70 | |
71 local q, thread = self.queue, self.thread; | |
72 if not thread or coroutine.status(thread) == "dead" then | |
73 thread = runner_create_thread(self.func, self); | |
74 self.thread = thread; | |
75 end | |
76 | |
77 local n, state, err = #q, self.state, nil; | |
78 self.state = "running"; | |
79 while n > 0 and state == "ready" do | |
80 local consumed; | |
81 for i = 1,n do | |
82 local input = q[i]; | |
83 local ok, new_state = coroutine.resume(thread, input); | |
84 if not ok then | |
85 consumed, state, err = i, "ready", debug.traceback(thread, new_state); | |
86 self.thread = nil; | |
87 break; | |
88 elseif state == "wait" then | |
89 consumed, state = i, "waiting"; | |
90 break; | |
91 end | |
92 end | |
93 if not consumed then consumed = n; end | |
94 if q[n+1] ~= nil then | |
95 n = #q; | |
96 end | |
97 for i = 1, n do | |
98 q[i] = q[consumed+i]; | |
99 end | |
100 n = #q; | |
101 end | |
102 self.state = state; | |
103 if state ~= self.notified_state then | |
104 self.notified_state = state; | |
105 local handler = self.watchers[state]; | |
106 if handler then handler(self, err); end | |
107 end | |
108 return true, state, n; | |
109 end | |
110 | |
111 function runner_mt:enqueue(input) | |
112 table.insert(self.queue, input); | |
113 end | |
114 | |
115 return { waiter = waiter, runner = runner }; |