Software / code / prosody
Comparison
util/smqueue.lua @ 12055:daced16154fa
util.smqueue: Abstract queue with acknowledgements and overflow
Meant to be used in mod_smacks for XEP-0198
Meant to have a larger virtual size than actual number of items stored,
on the theory that in most cases, the excess will be acked before needed
for a resumption event.
| author | Kim Alvefur <zash@zash.se> |
|---|---|
| date | Tue, 14 Dec 2021 19:58:53 +0100 |
| child | 12058:4860da718e87 |
comparison
equal
deleted
inserted
replaced
| 12054:0116fa57f05c | 12055:daced16154fa |
|---|---|
| 1 local queue = require("util.queue"); | |
| 2 | |
| 3 local lib = { smqueue = {} } | |
| 4 | |
| 5 local smqueue = lib.smqueue; | |
| 6 | |
| 7 function smqueue:push(v) | |
| 8 self._head = self._head + 1; | |
| 9 | |
| 10 assert(self._queue:push(v)); | |
| 11 end | |
| 12 | |
| 13 function smqueue:ack(h) | |
| 14 if h < self._tail then | |
| 15 return nil, "tail" | |
| 16 elseif h > self._head then | |
| 17 return nil, "head" | |
| 18 end | |
| 19 | |
| 20 local acked = {}; | |
| 21 self._tail = h; | |
| 22 local expect = self._head - self._tail; | |
| 23 while expect < self._queue:count() do | |
| 24 local v = self._queue:pop(); | |
| 25 if not v then return nil, "pop" end | |
| 26 table.insert(acked, v); | |
| 27 end | |
| 28 return acked | |
| 29 end | |
| 30 | |
| 31 function smqueue:count_unacked() return self._head - self._tail end | |
| 32 | |
| 33 function smqueue:count_acked() return self._tail end | |
| 34 | |
| 35 function smqueue:resumable() return self._queue:count() >= (self._head - self._tail) end | |
| 36 | |
| 37 function smqueue:resume() return self._queue:items() end | |
| 38 | |
| 39 function smqueue:consume() return self._queue:consume() end | |
| 40 | |
| 41 local compat_mt = {} | |
| 42 | |
| 43 function compat_mt:__index(i) | |
| 44 if i < self._queue._tail then return nil end | |
| 45 return self._queue._queue._items[(i + self._queue._tail) % self._queue._queue.size] | |
| 46 end | |
| 47 | |
| 48 function compat_mt:__len() return self._queue:count_unacked() end | |
| 49 | |
| 50 function smqueue:table() return setmetatable({ _queue = self }, compat_mt) end | |
| 51 | |
| 52 local function freeze(q) return { head = q._head; tail = q._tail } end | |
| 53 | |
| 54 local queue_mt = { __name = "smqueue"; __index = smqueue; __len = smqueue.count_unacked; __freeze = freeze } | |
| 55 | |
| 56 function lib.new(size) | |
| 57 assert(size > 0); | |
| 58 return setmetatable({ _head = 0; _tail = 0; _queue = queue.new(size, true) }, queue_mt) | |
| 59 end | |
| 60 | |
| 61 return lib |