Software /
code /
prosody
Comparison
teal-src/util/smqueue.tl @ 12055:daced16154fa
util.smqueue: Abstract queue with acknowledgements and overflow
Meant to be used in mod_smacks for XEP-0198
Meant to have a larger virtual size than actual number of items stored,
on the theory that in most cases, the excess will be acked before needed
for a resumption event.
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Tue, 14 Dec 2021 19:58:53 +0100 |
child | 12057:e880f5a13080 |
comparison
equal
deleted
inserted
replaced
12054:0116fa57f05c | 12055:daced16154fa |
---|---|
1 local queue = require "util.queue"; | |
2 | |
3 local record lib | |
4 -- T would typically be util.stanza | |
5 record smqueue<T> | |
6 _queue : queue.queue<T> | |
7 _head : integer | |
8 _tail : integer | |
9 | |
10 enum ack_errors | |
11 "tail" | |
12 "head" | |
13 "pop" | |
14 end | |
15 push : function (smqueue, T) | |
16 ack : function (smqueue, integer) : { T }, ack_errors | |
17 resumable : function (smqueue) : boolean | |
18 type consume_iter = function (smqueue<T>) : T | |
19 consume : function (smqueue<T>) : consume_iter | |
20 | |
21 table : function (smqueue<T>) : { T } | |
22 end | |
23 new : function <T>(integer) : smqueue<T> | |
24 end | |
25 | |
26 local type smqueue = lib.smqueue; | |
27 | |
28 function smqueue:push(v) | |
29 self._head = self._head + 1; | |
30 -- Wraps instead of errors | |
31 assert(self._queue:push(v)); | |
32 end | |
33 | |
34 function smqueue:ack(h : integer) : { any }, smqueue.ack_errors | |
35 if h < self._tail then | |
36 return nil, "tail"; | |
37 elseif h > self._head then | |
38 return nil, "head"; | |
39 end | |
40 -- TODO optimize? cache table fields | |
41 local acked = {}; | |
42 self._tail = h; | |
43 local expect = self._head - self._tail; | |
44 while expect < self._queue:count() do | |
45 local v = self._queue:pop(); | |
46 if not v then return nil, "pop"; end | |
47 table.insert(acked, v); | |
48 end | |
49 return acked; | |
50 end | |
51 | |
52 function smqueue:count_unacked() : integer | |
53 return self._head - self._tail; | |
54 end | |
55 | |
56 function smqueue:count_acked() : integer | |
57 return self._tail; | |
58 end | |
59 | |
60 function smqueue:resumable() : boolean | |
61 return self._queue:count() >= (self._head - self._tail); | |
62 end | |
63 | |
64 function smqueue:resume() : queue.queue.iterator, any, integer | |
65 return self._queue:items(); | |
66 end | |
67 | |
68 function smqueue:consume() : queue.queue.consume_iter | |
69 return self._queue:consume() | |
70 end | |
71 | |
72 -- Compatibility wrapper, meant to look like a plain ol' array | |
73 local record compat_mt | |
74 _queue : smqueue<any> | |
75 end | |
76 | |
77 function compat_mt:__index(i : integer) : any | |
78 if i < self._queue._tail then return nil end | |
79 return self._queue._queue._items[(i + self._queue._tail) % self._queue._queue.size]; | |
80 end | |
81 | |
82 function compat_mt:__len() : integer | |
83 return self._queue:count_unacked() | |
84 end | |
85 | |
86 function smqueue:table() : { any } | |
87 return setmetatable({ _queue = self }, compat_mt); | |
88 end | |
89 | |
90 local function freeze(q : smqueue<any>) : { string:integer } | |
91 return { head = q._head, tail = q._tail } | |
92 end | |
93 | |
94 local queue_mt = { | |
95 -- | |
96 __name = "smqueue"; | |
97 __index = smqueue; | |
98 __len = smqueue.count_unacked; | |
99 __freeze = freeze; | |
100 } | |
101 | |
102 function lib.new<T>(size : integer) : queue.queue<T> | |
103 assert(size>0); | |
104 return setmetatable({ _head = 0; _tail = 0; _queue = queue.new(size, true) }, queue_mt); | |
105 end | |
106 | |
107 return lib; |