Comparison

mod_pubsub_subscription/mod_pubsub_subscription.lua @ 4511:97fac0ba0469

mod_pubsub_subscription: New module providing an API for pubsub subscriptions This lets other modules hook events from local or remote XEP-0060 pubsub services. API allows keeping track of items added, removed or if the whole node gets cleared or even deleted. Requested by MattJ.
author Kim Alvefur <zash@zash.se>
date Mon, 15 Mar 2021 16:31:23 +0100
child 5674:b40750891bee
comparison
equal deleted inserted replaced
4510:6690586826e8 4511:97fac0ba0469
1 local st = require "util.stanza";
2 local uuid = require "util.uuid";
3 local mt = require "util.multitable";
4 local cache = require "util.cache";
5
6 local xmlns_pubsub = "http://jabber.org/protocol/pubsub";
7 local xmlns_pubsub_event = "http://jabber.org/protocol/pubsub#event";
8
9 -- TODO persist
10 -- TODO query known pubsub nodes to sync current subscriptions
11 -- TODO subscription ids per 'item' would be handy
12
13 local pending_subscription = cache.new(256); -- uuid → node
14 local pending_unsubscription = cache.new(256); -- uuid → node
15 local active_subscriptions = mt.new() -- service | node | uuid | { item }
16 function module.save()
17 return { active_subscriptions = active_subscriptions.data }
18 end
19 function module.restore(data)
20 if data and data.active_subscriptions then
21 active_subscriptions.data = data.active_subscriptions
22 end
23 end
24
25 local valid_events = {"subscribed"; "unsubscribed"; "error"; "item"; "retract"; "purge"; "delete"}
26
27 local function subscription_added(item_event)
28 local item = item_event.item;
29 assert(item.service, "pubsub subscription item MUST have a 'service' field.");
30 assert(item.node, "pubsub subscription item MUST have a 'node' field.");
31
32 local already_subscibed = false;
33 for _ in active_subscriptions:iter(item.service, item.node, nil) do -- luacheck: ignore 512
34 already_subscibed = true;
35 break
36 end
37
38 item._id = uuid.generate();
39 local iq_id = uuid.generate();
40 pending_subscription:set(iq_id, item._id);
41 active_subscriptions:set(item.service, item.node, item._id, item);
42
43 if not already_subscibed then
44 module:send(st.iq({ type = "set", id = iq_id, from = module.host, to = item.service })
45 :tag("pubsub", { xmlns = xmlns_pubsub })
46 :tag("subscribe", { jid = module.host, node = item.node }));
47 end
48 end
49
50 for _, event_name in ipairs(valid_events) do
51 module:hook("pubsub-event/host/"..event_name, function (event)
52 for _, _, _, _, cb in active_subscriptions:iter(event.service, event.node, nil, "on_"..event_name) do
53 pcall(cb, event);
54 end
55 end);
56 end
57
58 module:hook("iq/host", function (event)
59 local stanza = event.stanza;
60 local service = stanza.attr.from;
61
62 if not stanza.attr.id then return end -- shouldn't be possible
63
64 local subscribed_node = pending_subscription:get(stanza.attr.id);
65 pending_subscription:set(stanza.attr.id, nil);
66 local unsubscribed_node = pending_unsubscription:get(stanza.attr.id);
67 pending_unsubscription:set(stanza.attr.id, nil);
68
69 if stanza.attr.type == "result" then
70 local pubsub_wrapper = stanza:get_child("pubsub", xmlns_pubsub);
71 local subscription = pubsub_wrapper and pubsub_wrapper:get_child("subscription");
72 if not subscription then return end
73 local node = subscription.attr.node;
74
75 local what;
76 if subscription.attr.subscription == "subscribed" then
77 what = "on_subscribed";
78 elseif subscription.attr.subscription == "none" then
79 what = "on_unsubscribed";
80 end
81 if not what then return end -- there are other states but we don't handle them
82 for _, _, _, _, cb in active_subscriptions:iter(service, node, nil, what) do
83 cb(event);
84 end
85 return true;
86
87 elseif stanza.attr.type == "error" then
88 local node = subscribed_node or unsubscribed_node;
89 local error_type, error_condition, reason, pubsub_error = stanza:get_error();
90 local err = { type = error_type, condition = error_condition, text = reason, extra = pubsub_error };
91 if active_subscriptions:get(service) then
92 for _, _, _, _, cb in active_subscriptions:iter(service, node, nil, "on_error") do
93 cb(err);
94 end
95 return true;
96 end
97 end
98 end, 1);
99
100 local function subscription_removed(item_event)
101 local item = item_event.item;
102 active_subscriptions:set(item.service, item.node, item._id, nil);
103 local node_subs = active_subscriptions:get(item.service, item.node);
104 if node_subs and next(node_subs) then return end
105
106 local iq_id = uuid.generate();
107 pending_unsubscription:set(iq_id, item._id);
108
109 module:send(st.iq({ type = "set", id = iq_id, from = module.host, to = item.service })
110 :tag("pubsub", { xmlns = xmlns_pubsub })
111 :tag("unsubscribe", { jid = module.host, node = item.node }))
112 end
113
114 module:handle_items("pubsub-subscription", subscription_added, subscription_removed, true);
115
116 module:hook("message/host", function(event)
117 local origin, stanza = event.origin, event.stanza;
118 local ret = nil;
119 local service = stanza.attr.from;
120 module:log("debug", "Got message/host: %s", stanza:top_tag());
121 for event_container in stanza:childtags("event", xmlns_pubsub_event) do
122 for pubsub_event in event_container:childtags() do
123 module:log("debug", "Got pubsub event %s", pubsub_event:top_tag());
124 local node = pubsub_event.attr.node;
125 module:fire_event("pubsub-event/host/"..pubsub_event.name, {
126 stanza = stanza;
127 origin = origin;
128 event = pubsub_event;
129 service = service;
130 node = node;
131 });
132 ret = true;
133 end
134 end
135 return ret;
136 end);
137
138 module:hook("pubsub-event/host/items", function (event)
139 for item in event.event:childtags() do
140 module:log("debug", "Got pubsub item event %s", item:top_tag());
141 event.item = item;
142 event.payload = item.tags[1];
143 module:fire_event("pubsub-event/host/"..item.name, event);
144 end
145 end);