Software / code / prosody
Comparison
teal-src/util/smqueue.tl @ 12055:daced16154fa
util.smqueue: Abstract queue with acknowledgements and overflow
Meant to be used in mod_smacks for XEP-0198
Meant to have a larger virtual size than actual number of items stored,
on the theory that in most cases, the excess will be acked before needed
for a resumption event.
| author | Kim Alvefur <zash@zash.se> |
|---|---|
| date | Tue, 14 Dec 2021 19:58:53 +0100 |
| child | 12057:e880f5a13080 |
comparison
equal
deleted
inserted
replaced
| 12054:0116fa57f05c | 12055:daced16154fa |
|---|---|
| 1 local queue = require "util.queue"; | |
| 2 | |
| 3 local record lib | |
| 4 -- T would typically be util.stanza | |
| 5 record smqueue<T> | |
| 6 _queue : queue.queue<T> | |
| 7 _head : integer | |
| 8 _tail : integer | |
| 9 | |
| 10 enum ack_errors | |
| 11 "tail" | |
| 12 "head" | |
| 13 "pop" | |
| 14 end | |
| 15 push : function (smqueue, T) | |
| 16 ack : function (smqueue, integer) : { T }, ack_errors | |
| 17 resumable : function (smqueue) : boolean | |
| 18 type consume_iter = function (smqueue<T>) : T | |
| 19 consume : function (smqueue<T>) : consume_iter | |
| 20 | |
| 21 table : function (smqueue<T>) : { T } | |
| 22 end | |
| 23 new : function <T>(integer) : smqueue<T> | |
| 24 end | |
| 25 | |
| 26 local type smqueue = lib.smqueue; | |
| 27 | |
| 28 function smqueue:push(v) | |
| 29 self._head = self._head + 1; | |
| 30 -- Wraps instead of errors | |
| 31 assert(self._queue:push(v)); | |
| 32 end | |
| 33 | |
| 34 function smqueue:ack(h : integer) : { any }, smqueue.ack_errors | |
| 35 if h < self._tail then | |
| 36 return nil, "tail"; | |
| 37 elseif h > self._head then | |
| 38 return nil, "head"; | |
| 39 end | |
| 40 -- TODO optimize? cache table fields | |
| 41 local acked = {}; | |
| 42 self._tail = h; | |
| 43 local expect = self._head - self._tail; | |
| 44 while expect < self._queue:count() do | |
| 45 local v = self._queue:pop(); | |
| 46 if not v then return nil, "pop"; end | |
| 47 table.insert(acked, v); | |
| 48 end | |
| 49 return acked; | |
| 50 end | |
| 51 | |
| 52 function smqueue:count_unacked() : integer | |
| 53 return self._head - self._tail; | |
| 54 end | |
| 55 | |
| 56 function smqueue:count_acked() : integer | |
| 57 return self._tail; | |
| 58 end | |
| 59 | |
| 60 function smqueue:resumable() : boolean | |
| 61 return self._queue:count() >= (self._head - self._tail); | |
| 62 end | |
| 63 | |
| 64 function smqueue:resume() : queue.queue.iterator, any, integer | |
| 65 return self._queue:items(); | |
| 66 end | |
| 67 | |
| 68 function smqueue:consume() : queue.queue.consume_iter | |
| 69 return self._queue:consume() | |
| 70 end | |
| 71 | |
| 72 -- Compatibility wrapper, meant to look like a plain ol' array | |
| 73 local record compat_mt | |
| 74 _queue : smqueue<any> | |
| 75 end | |
| 76 | |
| 77 function compat_mt:__index(i : integer) : any | |
| 78 if i < self._queue._tail then return nil end | |
| 79 return self._queue._queue._items[(i + self._queue._tail) % self._queue._queue.size]; | |
| 80 end | |
| 81 | |
| 82 function compat_mt:__len() : integer | |
| 83 return self._queue:count_unacked() | |
| 84 end | |
| 85 | |
| 86 function smqueue:table() : { any } | |
| 87 return setmetatable({ _queue = self }, compat_mt); | |
| 88 end | |
| 89 | |
| 90 local function freeze(q : smqueue<any>) : { string:integer } | |
| 91 return { head = q._head, tail = q._tail } | |
| 92 end | |
| 93 | |
| 94 local queue_mt = { | |
| 95 -- | |
| 96 __name = "smqueue"; | |
| 97 __index = smqueue; | |
| 98 __len = smqueue.count_unacked; | |
| 99 __freeze = freeze; | |
| 100 } | |
| 101 | |
| 102 function lib.new<T>(size : integer) : queue.queue<T> | |
| 103 assert(size>0); | |
| 104 return setmetatable({ _head = 0; _tail = 0; _queue = queue.new(size, true) }, queue_mt); | |
| 105 end | |
| 106 | |
| 107 return lib; |