Software /
code /
prosody
Comparison
util/pubsub.lua @ 7697:bd854e762875
Merge 0.10->trunk
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Sun, 16 Oct 2016 00:39:10 +0200 |
parent | 6791:e813e8cf6046 |
parent | 7696:1c410b4f3a58 |
child | 7704:5022e6181193 |
comparison
equal
deleted
inserted
replaced
7694:bffbea1187ca | 7697:bd854e762875 |
---|---|
1 local events = require "util.events"; | 1 local events = require "util.events"; |
2 local t_remove = table.remove; | 2 local cache = require "util.cache"; |
3 | 3 |
4 local service = {}; | 4 local service = {}; |
5 local service_mt = { __index = service }; | 5 local service_mt = { __index = service }; |
6 | 6 |
7 local default_config = { __index = { | 7 local default_config = { __index = { |
8 itemstore = function (config) return cache.new(tonumber(config["pubsub#max_items"])) end; | |
8 broadcaster = function () end; | 9 broadcaster = function () end; |
9 get_affiliation = function () end; | 10 get_affiliation = function () end; |
10 capabilities = {}; | 11 capabilities = {}; |
11 } }; | 12 } }; |
12 local default_node_config = { __index = { | 13 local default_node_config = { __index = { |
214 -- | 215 -- |
215 if self.nodes[node] then | 216 if self.nodes[node] then |
216 return false, "conflict"; | 217 return false, "conflict"; |
217 end | 218 end |
218 | 219 |
219 self.data[node] = {}; | |
220 self.nodes[node] = { | 220 self.nodes[node] = { |
221 name = node; | 221 name = node; |
222 subscribers = {}; | 222 subscribers = {}; |
223 config = setmetatable(options or {}, {__index=self.node_defaults}); | 223 config = setmetatable(options or {}, {__index=self.node_defaults}); |
224 affiliations = {}; | 224 affiliations = {}; |
225 }; | 225 }; |
226 setmetatable(self.nodes[node], { __index = { data = self.data[node] } }); -- COMPAT | 226 self.data[node] = self.config.itemstore(self.nodes[node].config); |
227 self.events.fire_event("node-created", { node = node, actor = actor }); | 227 self.events.fire_event("node-created", { node = node, actor = actor }); |
228 local ok, err = self:set_affiliation(node, true, actor, "owner"); | 228 local ok, err = self:set_affiliation(node, true, actor, "owner"); |
229 if not ok then | 229 if not ok then |
230 self.nodes[node] = nil; | 230 self.nodes[node] = nil; |
231 self.data[node] = nil; | 231 self.data[node] = nil; |
248 self.events.fire_event("node-deleted", { node = node, actor = actor }); | 248 self.events.fire_event("node-deleted", { node = node, actor = actor }); |
249 self.config.broadcaster("delete", node, node_obj.subscribers); | 249 self.config.broadcaster("delete", node, node_obj.subscribers); |
250 return true; | 250 return true; |
251 end | 251 end |
252 | 252 |
253 local function remove_item_by_id(data, id) | |
254 if not data[id] then return end | |
255 data[id] = nil; | |
256 for i, _id in ipairs(data) do | |
257 if id == _id then | |
258 t_remove(data, i); | |
259 return i; | |
260 end | |
261 end | |
262 end | |
263 | |
264 local function trim_items(data, max) | |
265 max = tonumber(max); | |
266 if not max or #data <= max then return end | |
267 repeat | |
268 data[t_remove(data, 1)] = nil; | |
269 until #data <= max | |
270 end | |
271 | |
272 function service:publish(node, actor, id, item) | 253 function service:publish(node, actor, id, item) |
273 -- Access checking | 254 -- Access checking |
274 if not self:may(node, actor, "publish") then | 255 if not self:may(node, actor, "publish") then |
275 return false, "forbidden"; | 256 return false, "forbidden"; |
276 end | 257 end |
285 return ok, err; | 266 return ok, err; |
286 end | 267 end |
287 node_obj = self.nodes[node]; | 268 node_obj = self.nodes[node]; |
288 end | 269 end |
289 local node_data = self.data[node]; | 270 local node_data = self.data[node]; |
290 remove_item_by_id(node_data, id); | 271 local ok = node_data:set(id, item); |
291 node_data[#node_data + 1] = id; | 272 if not ok then |
292 node_data[id] = item; | 273 return nil, "internal-server-error"; |
293 trim_items(node_data, node_obj.config["pubsub#max_items"]); | 274 end |
294 self.events.fire_event("item-published", { node = node, actor = actor, id = id, item = item }); | 275 self.events.fire_event("item-published", { node = node, actor = actor, id = id, item = item }); |
295 self.config.broadcaster("items", node, node_obj.subscribers, item, actor); | 276 self.config.broadcaster("items", node, node_obj.subscribers, item, actor); |
296 return true; | 277 return true; |
297 end | 278 end |
298 | 279 |
301 if not self:may(node, actor, "retract") then | 282 if not self:may(node, actor, "retract") then |
302 return false, "forbidden"; | 283 return false, "forbidden"; |
303 end | 284 end |
304 -- | 285 -- |
305 local node_obj = self.nodes[node]; | 286 local node_obj = self.nodes[node]; |
306 if (not node_obj) or (not self.data[node][id]) then | 287 if (not node_obj) or (not self.data[node]:get(id)) then |
307 return false, "item-not-found"; | 288 return false, "item-not-found"; |
289 end | |
290 local ok = self.data[node]:set(id, nil); | |
291 if not ok then | |
292 return nil, "internal-server-error"; | |
308 end | 293 end |
309 self.events.fire_event("item-retracted", { node = node, actor = actor, id = id }); | 294 self.events.fire_event("item-retracted", { node = node, actor = actor, id = id }); |
310 remove_item_by_id(self.data[node], id); | |
311 if retract then | 295 if retract then |
312 self.config.broadcaster("items", node, node_obj.subscribers, retract); | 296 self.config.broadcaster("items", node, node_obj.subscribers, retract); |
313 end | 297 end |
314 return true | 298 return true |
315 end | 299 end |
322 -- | 306 -- |
323 local node_obj = self.nodes[node]; | 307 local node_obj = self.nodes[node]; |
324 if not node_obj then | 308 if not node_obj then |
325 return false, "item-not-found"; | 309 return false, "item-not-found"; |
326 end | 310 end |
327 self.data[node] = {}; -- Purge | 311 self.data[node] = self.config.itemstore(self.nodes[node].config); |
328 self.events.fire_event("node-purged", { node = node, actor = actor }); | 312 self.events.fire_event("node-purged", { node = node, actor = actor }); |
329 if notify then | 313 if notify then |
330 self.config.broadcaster("purge", node, node_obj.subscribers); | 314 self.config.broadcaster("purge", node, node_obj.subscribers); |
331 end | 315 end |
332 return true | 316 return true |
341 local node_obj = self.nodes[node]; | 325 local node_obj = self.nodes[node]; |
342 if not node_obj then | 326 if not node_obj then |
343 return false, "item-not-found"; | 327 return false, "item-not-found"; |
344 end | 328 end |
345 if id then -- Restrict results to a single specific item | 329 if id then -- Restrict results to a single specific item |
346 return true, { id, [id] = self.data[node][id] }; | 330 return true, { id, [id] = self.data[node]:get(id) }; |
347 else | 331 else |
348 return true, self.data[node]; | 332 local data = {} |
333 for key, value in self.data[node]:items() do | |
334 data[key] = value; | |
335 end | |
336 return true, data; | |
349 end | 337 end |
350 end | 338 end |
351 | 339 |
352 function service:get_nodes(actor) | 340 function service:get_nodes(actor) |
353 -- Access checking | 341 -- Access checking |
433 end | 421 end |
434 | 422 |
435 for k,v in pairs(new_config) do | 423 for k,v in pairs(new_config) do |
436 node_obj.config[k] = v; | 424 node_obj.config[k] = v; |
437 end | 425 end |
438 trim_items(self.data[node], node_obj.config["pubsub#max_items"]); | 426 local new_data = self.config.itemstore(self.nodes[node].config); |
439 | 427 for key, value in self.data[node]:items() do |
428 new_data:set(key, value); | |
429 end | |
430 self.data[node] = new_data; | |
440 return true; | 431 return true; |
441 end | 432 end |
442 | 433 |
443 return { | 434 return { |
444 new = new; | 435 new = new; |