Software / code / prosody-modules
Comparison
mod_pubsub_subscription/mod_pubsub_subscription.lua @ 4511:97fac0ba0469
mod_pubsub_subscription: New module providing an API for pubsub subscriptions
This lets other modules hook events from local or remote XEP-0060 pubsub
services. API allows keeping track of items added, removed or if the
whole node gets cleared or even deleted.
Requested by MattJ.
| author | Kim Alvefur <zash@zash.se> |
|---|---|
| date | Mon, 15 Mar 2021 16:31:23 +0100 |
| child | 5674:b40750891bee |
comparison
equal
deleted
inserted
replaced
| 4510:6690586826e8 | 4511:97fac0ba0469 |
|---|---|
| 1 local st = require "util.stanza"; | |
| 2 local uuid = require "util.uuid"; | |
| 3 local mt = require "util.multitable"; | |
| 4 local cache = require "util.cache"; | |
| 5 | |
| 6 local xmlns_pubsub = "http://jabber.org/protocol/pubsub"; | |
| 7 local xmlns_pubsub_event = "http://jabber.org/protocol/pubsub#event"; | |
| 8 | |
| 9 -- TODO persist | |
| 10 -- TODO query known pubsub nodes to sync current subscriptions | |
| 11 -- TODO subscription ids per 'item' would be handy | |
| 12 | |
| 13 local pending_subscription = cache.new(256); -- uuid → node | |
| 14 local pending_unsubscription = cache.new(256); -- uuid → node | |
| 15 local active_subscriptions = mt.new() -- service | node | uuid | { item } | |
| 16 function module.save() | |
| 17 return { active_subscriptions = active_subscriptions.data } | |
| 18 end | |
| 19 function module.restore(data) | |
| 20 if data and data.active_subscriptions then | |
| 21 active_subscriptions.data = data.active_subscriptions | |
| 22 end | |
| 23 end | |
| 24 | |
| 25 local valid_events = {"subscribed"; "unsubscribed"; "error"; "item"; "retract"; "purge"; "delete"} | |
| 26 | |
| 27 local function subscription_added(item_event) | |
| 28 local item = item_event.item; | |
| 29 assert(item.service, "pubsub subscription item MUST have a 'service' field."); | |
| 30 assert(item.node, "pubsub subscription item MUST have a 'node' field."); | |
| 31 | |
| 32 local already_subscibed = false; | |
| 33 for _ in active_subscriptions:iter(item.service, item.node, nil) do -- luacheck: ignore 512 | |
| 34 already_subscibed = true; | |
| 35 break | |
| 36 end | |
| 37 | |
| 38 item._id = uuid.generate(); | |
| 39 local iq_id = uuid.generate(); | |
| 40 pending_subscription:set(iq_id, item._id); | |
| 41 active_subscriptions:set(item.service, item.node, item._id, item); | |
| 42 | |
| 43 if not already_subscibed then | |
| 44 module:send(st.iq({ type = "set", id = iq_id, from = module.host, to = item.service }) | |
| 45 :tag("pubsub", { xmlns = xmlns_pubsub }) | |
| 46 :tag("subscribe", { jid = module.host, node = item.node })); | |
| 47 end | |
| 48 end | |
| 49 | |
| 50 for _, event_name in ipairs(valid_events) do | |
| 51 module:hook("pubsub-event/host/"..event_name, function (event) | |
| 52 for _, _, _, _, cb in active_subscriptions:iter(event.service, event.node, nil, "on_"..event_name) do | |
| 53 pcall(cb, event); | |
| 54 end | |
| 55 end); | |
| 56 end | |
| 57 | |
| 58 module:hook("iq/host", function (event) | |
| 59 local stanza = event.stanza; | |
| 60 local service = stanza.attr.from; | |
| 61 | |
| 62 if not stanza.attr.id then return end -- shouldn't be possible | |
| 63 | |
| 64 local subscribed_node = pending_subscription:get(stanza.attr.id); | |
| 65 pending_subscription:set(stanza.attr.id, nil); | |
| 66 local unsubscribed_node = pending_unsubscription:get(stanza.attr.id); | |
| 67 pending_unsubscription:set(stanza.attr.id, nil); | |
| 68 | |
| 69 if stanza.attr.type == "result" then | |
| 70 local pubsub_wrapper = stanza:get_child("pubsub", xmlns_pubsub); | |
| 71 local subscription = pubsub_wrapper and pubsub_wrapper:get_child("subscription"); | |
| 72 if not subscription then return end | |
| 73 local node = subscription.attr.node; | |
| 74 | |
| 75 local what; | |
| 76 if subscription.attr.subscription == "subscribed" then | |
| 77 what = "on_subscribed"; | |
| 78 elseif subscription.attr.subscription == "none" then | |
| 79 what = "on_unsubscribed"; | |
| 80 end | |
| 81 if not what then return end -- there are other states but we don't handle them | |
| 82 for _, _, _, _, cb in active_subscriptions:iter(service, node, nil, what) do | |
| 83 cb(event); | |
| 84 end | |
| 85 return true; | |
| 86 | |
| 87 elseif stanza.attr.type == "error" then | |
| 88 local node = subscribed_node or unsubscribed_node; | |
| 89 local error_type, error_condition, reason, pubsub_error = stanza:get_error(); | |
| 90 local err = { type = error_type, condition = error_condition, text = reason, extra = pubsub_error }; | |
| 91 if active_subscriptions:get(service) then | |
| 92 for _, _, _, _, cb in active_subscriptions:iter(service, node, nil, "on_error") do | |
| 93 cb(err); | |
| 94 end | |
| 95 return true; | |
| 96 end | |
| 97 end | |
| 98 end, 1); | |
| 99 | |
| 100 local function subscription_removed(item_event) | |
| 101 local item = item_event.item; | |
| 102 active_subscriptions:set(item.service, item.node, item._id, nil); | |
| 103 local node_subs = active_subscriptions:get(item.service, item.node); | |
| 104 if node_subs and next(node_subs) then return end | |
| 105 | |
| 106 local iq_id = uuid.generate(); | |
| 107 pending_unsubscription:set(iq_id, item._id); | |
| 108 | |
| 109 module:send(st.iq({ type = "set", id = iq_id, from = module.host, to = item.service }) | |
| 110 :tag("pubsub", { xmlns = xmlns_pubsub }) | |
| 111 :tag("unsubscribe", { jid = module.host, node = item.node })) | |
| 112 end | |
| 113 | |
| 114 module:handle_items("pubsub-subscription", subscription_added, subscription_removed, true); | |
| 115 | |
| 116 module:hook("message/host", function(event) | |
| 117 local origin, stanza = event.origin, event.stanza; | |
| 118 local ret = nil; | |
| 119 local service = stanza.attr.from; | |
| 120 module:log("debug", "Got message/host: %s", stanza:top_tag()); | |
| 121 for event_container in stanza:childtags("event", xmlns_pubsub_event) do | |
| 122 for pubsub_event in event_container:childtags() do | |
| 123 module:log("debug", "Got pubsub event %s", pubsub_event:top_tag()); | |
| 124 local node = pubsub_event.attr.node; | |
| 125 module:fire_event("pubsub-event/host/"..pubsub_event.name, { | |
| 126 stanza = stanza; | |
| 127 origin = origin; | |
| 128 event = pubsub_event; | |
| 129 service = service; | |
| 130 node = node; | |
| 131 }); | |
| 132 ret = true; | |
| 133 end | |
| 134 end | |
| 135 return ret; | |
| 136 end); | |
| 137 | |
| 138 module:hook("pubsub-event/host/items", function (event) | |
| 139 for item in event.event:childtags() do | |
| 140 module:log("debug", "Got pubsub item event %s", item:top_tag()); | |
| 141 event.item = item; | |
| 142 event.payload = item.tags[1]; | |
| 143 module:fire_event("pubsub-event/host/"..item.name, event); | |
| 144 end | |
| 145 end); |