Comparison

util/pubsub.lua @ 7695:56ce32cfd6d9

util.pubsub: Switch to use util.cache for item data
author Kim Alvefur <zash@zash.se>
date Sun, 16 Oct 2016 00:30:02 +0200
parent 6777:5de6b93d0190
child 7696:1c410b4f3a58
comparison
equal deleted inserted replaced
7692:90ddd53cbe08 7695:56ce32cfd6d9
1 local events = require "util.events"; 1 local events = require "util.events";
2 local t_remove = table.remove; 2 local cache = require "util.cache";
3 3
4 local service = {}; 4 local service = {};
5 local service_mt = { __index = service }; 5 local service_mt = { __index = service };
6 6
7 local default_config = { __index = { 7 local default_config = { __index = {
214 -- 214 --
215 if self.nodes[node] then 215 if self.nodes[node] then
216 return false, "conflict"; 216 return false, "conflict";
217 end 217 end
218 218
219 self.data[node] = {};
220 self.nodes[node] = { 219 self.nodes[node] = {
221 name = node; 220 name = node;
222 subscribers = {}; 221 subscribers = {};
223 config = setmetatable(options or {}, {__index=self.node_defaults}); 222 config = setmetatable(options or {}, {__index=self.node_defaults});
224 affiliations = {}; 223 affiliations = {};
225 }; 224 };
226 setmetatable(self.nodes[node], { __index = { data = self.data[node] } }); -- COMPAT 225 self.data[node] = cache.new(self.nodes[node].config["pubsub#max_items"]);
227 self.events.fire_event("node-created", { node = node, actor = actor }); 226 self.events.fire_event("node-created", { node = node, actor = actor });
228 local ok, err = self:set_affiliation(node, true, actor, "owner"); 227 local ok, err = self:set_affiliation(node, true, actor, "owner");
229 if not ok then 228 if not ok then
230 self.nodes[node] = nil; 229 self.nodes[node] = nil;
231 self.data[node] = nil; 230 self.data[node] = nil;
248 self.events.fire_event("node-deleted", { node = node, actor = actor }); 247 self.events.fire_event("node-deleted", { node = node, actor = actor });
249 self.config.broadcaster("delete", node, node_obj.subscribers); 248 self.config.broadcaster("delete", node, node_obj.subscribers);
250 return true; 249 return true;
251 end 250 end
252 251
253 local function remove_item_by_id(data, id)
254 if not data[id] then return end
255 data[id] = nil;
256 for i, _id in ipairs(data) do
257 if id == _id then
258 t_remove(data, i);
259 return i;
260 end
261 end
262 end
263
264 local function trim_items(data, max)
265 max = tonumber(max);
266 if not max or #data <= max then return end
267 repeat
268 data[t_remove(data, 1)] = nil;
269 until #data <= max
270 end
271
272 function service:publish(node, actor, id, item) 252 function service:publish(node, actor, id, item)
273 -- Access checking 253 -- Access checking
274 if not self:may(node, actor, "publish") then 254 if not self:may(node, actor, "publish") then
275 return false, "forbidden"; 255 return false, "forbidden";
276 end 256 end
285 return ok, err; 265 return ok, err;
286 end 266 end
287 node_obj = self.nodes[node]; 267 node_obj = self.nodes[node];
288 end 268 end
289 local node_data = self.data[node]; 269 local node_data = self.data[node];
290 remove_item_by_id(node_data, id); 270 local ok = node_data:set(id, item);
291 node_data[#node_data + 1] = id; 271 if not ok then
292 node_data[id] = item; 272 return nil, "internal-server-error";
293 trim_items(node_data, node_obj.config["pubsub#max_items"]); 273 end
294 self.events.fire_event("item-published", { node = node, actor = actor, id = id, item = item }); 274 self.events.fire_event("item-published", { node = node, actor = actor, id = id, item = item });
295 self.config.broadcaster("items", node, node_obj.subscribers, item, actor); 275 self.config.broadcaster("items", node, node_obj.subscribers, item, actor);
296 return true; 276 return true;
297 end 277 end
298 278
301 if not self:may(node, actor, "retract") then 281 if not self:may(node, actor, "retract") then
302 return false, "forbidden"; 282 return false, "forbidden";
303 end 283 end
304 -- 284 --
305 local node_obj = self.nodes[node]; 285 local node_obj = self.nodes[node];
306 if (not node_obj) or (not self.data[node][id]) then 286 if (not node_obj) or (not self.data[node]:get(id)) then
307 return false, "item-not-found"; 287 return false, "item-not-found";
288 end
289 local ok = self.data[node]:set(id, nil);
290 if not ok then
291 return nil, "internal-server-error";
308 end 292 end
309 self.events.fire_event("item-retracted", { node = node, actor = actor, id = id }); 293 self.events.fire_event("item-retracted", { node = node, actor = actor, id = id });
310 remove_item_by_id(self.data[node], id);
311 if retract then 294 if retract then
312 self.config.broadcaster("items", node, node_obj.subscribers, retract); 295 self.config.broadcaster("items", node, node_obj.subscribers, retract);
313 end 296 end
314 return true 297 return true
315 end 298 end
322 -- 305 --
323 local node_obj = self.nodes[node]; 306 local node_obj = self.nodes[node];
324 if not node_obj then 307 if not node_obj then
325 return false, "item-not-found"; 308 return false, "item-not-found";
326 end 309 end
327 self.data[node] = {}; -- Purge 310 self.data[node] = cache.new(node_obj.config["pubsub#max_items"]); -- Purge
328 self.events.fire_event("node-purged", { node = node, actor = actor }); 311 self.events.fire_event("node-purged", { node = node, actor = actor });
329 if notify then 312 if notify then
330 self.config.broadcaster("purge", node, node_obj.subscribers); 313 self.config.broadcaster("purge", node, node_obj.subscribers);
331 end 314 end
332 return true 315 return true
341 local node_obj = self.nodes[node]; 324 local node_obj = self.nodes[node];
342 if not node_obj then 325 if not node_obj then
343 return false, "item-not-found"; 326 return false, "item-not-found";
344 end 327 end
345 if id then -- Restrict results to a single specific item 328 if id then -- Restrict results to a single specific item
346 return true, { id, [id] = self.data[node][id] }; 329 return true, { id, [id] = self.data[node]:get(id) };
347 else 330 else
348 return true, self.data[node]; 331 local data = {}
332 for key, value in self.data[node]:items() do
333 data[key] = value;
334 end
335 return true, data;
349 end 336 end
350 end 337 end
351 338
352 function service:get_nodes(actor) 339 function service:get_nodes(actor)
353 -- Access checking 340 -- Access checking
433 end 420 end
434 421
435 for k,v in pairs(new_config) do 422 for k,v in pairs(new_config) do
436 node_obj.config[k] = v; 423 node_obj.config[k] = v;
437 end 424 end
438 trim_items(self.data[node], node_obj.config["pubsub#max_items"]); 425 local new_data = cache.new(node_obj.config["pubsub#max_items"]);
439 426 for key, value in self.data[node]:items() do
427 new_data:set(key, value);
428 end
429 self.data[node] = new_data;
440 return true; 430 return true;
441 end 431 end
442 432
443 return { 433 return {
444 new = new; 434 new = new;