Comparison

teal-src/util/smqueue.tl @ 12055:daced16154fa

util.smqueue: Abstract queue with acknowledgements and overflow Meant to be used in mod_smacks for XEP-0198 Meant to have a larger virtual size than actual number of items stored, on the theory that in most cases, the excess will be acked before needed for a resumption event.
author Kim Alvefur <zash@zash.se>
date Tue, 14 Dec 2021 19:58:53 +0100
child 12057:e880f5a13080
comparison
equal deleted inserted replaced
12054:0116fa57f05c 12055:daced16154fa
1 local queue = require "util.queue";
2
3 local record lib
4 -- T would typically be util.stanza
5 record smqueue<T>
6 _queue : queue.queue<T>
7 _head : integer
8 _tail : integer
9
10 enum ack_errors
11 "tail"
12 "head"
13 "pop"
14 end
15 push : function (smqueue, T)
16 ack : function (smqueue, integer) : { T }, ack_errors
17 resumable : function (smqueue) : boolean
18 type consume_iter = function (smqueue<T>) : T
19 consume : function (smqueue<T>) : consume_iter
20
21 table : function (smqueue<T>) : { T }
22 end
23 new : function <T>(integer) : smqueue<T>
24 end
25
26 local type smqueue = lib.smqueue;
27
28 function smqueue:push(v)
29 self._head = self._head + 1;
30 -- Wraps instead of errors
31 assert(self._queue:push(v));
32 end
33
34 function smqueue:ack(h : integer) : { any }, smqueue.ack_errors
35 if h < self._tail then
36 return nil, "tail";
37 elseif h > self._head then
38 return nil, "head";
39 end
40 -- TODO optimize? cache table fields
41 local acked = {};
42 self._tail = h;
43 local expect = self._head - self._tail;
44 while expect < self._queue:count() do
45 local v = self._queue:pop();
46 if not v then return nil, "pop"; end
47 table.insert(acked, v);
48 end
49 return acked;
50 end
51
52 function smqueue:count_unacked() : integer
53 return self._head - self._tail;
54 end
55
56 function smqueue:count_acked() : integer
57 return self._tail;
58 end
59
60 function smqueue:resumable() : boolean
61 return self._queue:count() >= (self._head - self._tail);
62 end
63
64 function smqueue:resume() : queue.queue.iterator, any, integer
65 return self._queue:items();
66 end
67
68 function smqueue:consume() : queue.queue.consume_iter
69 return self._queue:consume()
70 end
71
72 -- Compatibility wrapper, meant to look like a plain ol' array
73 local record compat_mt
74 _queue : smqueue<any>
75 end
76
77 function compat_mt:__index(i : integer) : any
78 if i < self._queue._tail then return nil end
79 return self._queue._queue._items[(i + self._queue._tail) % self._queue._queue.size];
80 end
81
82 function compat_mt:__len() : integer
83 return self._queue:count_unacked()
84 end
85
86 function smqueue:table() : { any }
87 return setmetatable({ _queue = self }, compat_mt);
88 end
89
90 local function freeze(q : smqueue<any>) : { string:integer }
91 return { head = q._head, tail = q._tail }
92 end
93
94 local queue_mt = {
95 --
96 __name = "smqueue";
97 __index = smqueue;
98 __len = smqueue.count_unacked;
99 __freeze = freeze;
100 }
101
102 function lib.new<T>(size : integer) : queue.queue<T>
103 assert(size>0);
104 return setmetatable({ _head = 0; _tail = 0; _queue = queue.new(size, true) }, queue_mt);
105 end
106
107 return lib;