Changeset

4511:97fac0ba0469

mod_pubsub_subscription: New module providing an API for pubsub subscriptions This lets other modules hook events from local or remote XEP-0060 pubsub services. API allows keeping track of items added, removed or if the whole node gets cleared or even deleted. Requested by MattJ.
author Kim Alvefur <zash@zash.se>
date Mon, 15 Mar 2021 16:31:23 +0100
parents 4510:6690586826e8
children 4512:b88f05c878ac
files mod_pubsub_subscription/README.markdown mod_pubsub_subscription/mod_pubsub_subscription.lua
diffstat 2 files changed, 230 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_pubsub_subscription/README.markdown	Mon Mar 15 16:31:23 2021 +0100
@@ -0,0 +1,85 @@
+# Introduction
+
+This module lets you programmatically subscribe to updates from a
+[pubsub][xep0060] node, even if the pubsub service is remote.
+
+## Example
+
+``` {.lua}
+module:depends("pubsub_subscription");
+module:add_item("pubsub-subscription", {
+    service = "pubsub.example.com";
+    node = "otter_facts";
+
+    -- Callbacks:
+    on_subscribed = function()
+        module:log("info", "Otter facts incoming!");
+    end;
+
+    on_item = function(event)
+        module:log("info", "Random Otter Fact: %s", event.payload:get_text());
+    end;
+});
+```
+
+## Usage
+
+Ensure the module is loaded and add your subscription via the
+`:add_item` API. The item table MUST have `service` and `node` fields
+and SHOULD have one or more `on_<event>` callbacks.
+
+The JID of the pubsub service is given in `service` (could also be the
+JID of an user for advanced PEP usage) and the node is given in,
+unsurprisingly, the `node` field.
+
+The various `on_event` callback functions, if present, gets called when
+new events are received. The most interesting would be `on_item`, which
+receives incoming items. Available events are:
+
+`on_subscribed`
+:   The subscription was successful, events may follow.
+
+`on_unsubscribed`
+:   Subscription was removed successfully, this happens if the
+    subscription is removed, which you would normally never do.
+
+`on_error`
+:   If there was an error subscribing to the pubsub service. Receives a
+    table with `type`, `condition`, `text`, and `extra` fields as
+    argument.
+
+`on_item`
+:   An item publication, the payload itself available in the `payload`
+    field in the table provided as argument. The ID of the item can be
+    found in `item.attr.id`.
+
+`on_retract`
+:   When an item gets retracted (removed by the publisher). The ID of
+    the item can be found in `item.attr.id` of the table argument..
+
+`on_purge`
+:   All the items were removed by the publisher.
+
+`on_delete`
+:   The entire pubsub node was removed from the pubsub service. No
+    subscription exists after this.
+
+``` {.lua}
+event_payload = {
+    -- Common prosody event entries:
+    stanza = util.stanza;
+    origin = util.session;
+
+    -- PubSub service details
+    service = "pubsub.example.com";
+    node = "otter_facts";
+
+    -- The pubsub event itself
+    item = util.stanza; -- <item/>
+    payload = util.stanza; -- actual payload, child of <item/>
+}
+```
+
+# Compatibility
+
+Should work with Prosody \>= 0.11.x
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_pubsub_subscription/mod_pubsub_subscription.lua	Mon Mar 15 16:31:23 2021 +0100
@@ -0,0 +1,145 @@
+local st = require "util.stanza";
+local uuid = require "util.uuid";
+local mt = require "util.multitable";
+local cache = require "util.cache";
+
+local xmlns_pubsub = "http://jabber.org/protocol/pubsub";
+local xmlns_pubsub_event = "http://jabber.org/protocol/pubsub#event";
+
+-- TODO persist
+-- TODO query known pubsub nodes to sync current subscriptions
+-- TODO subscription ids per 'item' would be handy
+
+local pending_subscription = cache.new(256); -- uuid → node
+local pending_unsubscription = cache.new(256); -- uuid → node
+local active_subscriptions = mt.new() -- service | node | uuid | { item }
+function module.save()
+	return { active_subscriptions = active_subscriptions.data }
+end
+function module.restore(data)
+	if data and data.active_subscriptions then
+		active_subscriptions.data = data.active_subscriptions
+	end
+end
+
+local valid_events = {"subscribed"; "unsubscribed"; "error"; "item"; "retract"; "purge"; "delete"}
+
+local function subscription_added(item_event)
+	local item = item_event.item;
+	assert(item.service, "pubsub subscription item MUST have a 'service' field.");
+	assert(item.node, "pubsub subscription item MUST have a 'node' field.");
+
+	local already_subscibed = false;
+	for _ in active_subscriptions:iter(item.service, item.node, nil) do -- luacheck: ignore 512
+		already_subscibed = true;
+		break
+	end
+
+	item._id = uuid.generate();
+	local iq_id = uuid.generate();
+	pending_subscription:set(iq_id, item._id);
+	active_subscriptions:set(item.service, item.node, item._id, item);
+
+	if not already_subscibed then
+		module:send(st.iq({ type = "set", id = iq_id, from = module.host, to = item.service })
+			:tag("pubsub", { xmlns = xmlns_pubsub })
+				:tag("subscribe", { jid = module.host, node = item.node }));
+	end
+end
+
+for _, event_name in ipairs(valid_events) do
+	module:hook("pubsub-event/host/"..event_name, function (event)
+		for _, _, _, _, cb in active_subscriptions:iter(event.service, event.node, nil, "on_"..event_name) do
+			pcall(cb, event);
+		end
+	end);
+end
+
+module:hook("iq/host", function (event)
+	local stanza = event.stanza;
+	local service = stanza.attr.from;
+
+	if not stanza.attr.id then return end -- shouldn't be possible
+
+	local subscribed_node = pending_subscription:get(stanza.attr.id);
+	pending_subscription:set(stanza.attr.id, nil);
+	local unsubscribed_node = pending_unsubscription:get(stanza.attr.id);
+	pending_unsubscription:set(stanza.attr.id, nil);
+
+	if stanza.attr.type == "result" then
+		local pubsub_wrapper = stanza:get_child("pubsub", xmlns_pubsub);
+		local subscription = pubsub_wrapper and pubsub_wrapper:get_child("subscription");
+		if not subscription then return end
+		local node = subscription.attr.node;
+
+		local what;
+		if subscription.attr.subscription == "subscribed" then
+			what = "on_subscribed";
+		elseif subscription.attr.subscription == "none" then
+			what = "on_unsubscribed";
+		end
+		if not what then return end -- there are other states but we don't handle them
+		for _, _, _, _, cb in active_subscriptions:iter(service, node, nil, what) do
+			cb(event);
+		end
+		return true;
+
+	elseif stanza.attr.type == "error" then
+		local node = subscribed_node or unsubscribed_node;
+		local error_type, error_condition, reason, pubsub_error = stanza:get_error();
+		local err = { type = error_type, condition = error_condition, text = reason, extra = pubsub_error };
+		if active_subscriptions:get(service) then
+			for _, _, _, _, cb in active_subscriptions:iter(service, node, nil, "on_error") do
+				cb(err);
+			end
+			return true;
+		end
+	end
+end, 1);
+
+local function subscription_removed(item_event)
+	local item = item_event.item;
+	active_subscriptions:set(item.service, item.node, item._id, nil);
+	local node_subs = active_subscriptions:get(item.service, item.node);
+	if node_subs and next(node_subs) then return end
+
+	local iq_id = uuid.generate();
+	pending_unsubscription:set(iq_id, item._id);
+
+	module:send(st.iq({ type = "set", id = iq_id, from = module.host, to = item.service })
+		:tag("pubsub", { xmlns = xmlns_pubsub })
+			:tag("unsubscribe", { jid = module.host, node = item.node }))
+end
+
+module:handle_items("pubsub-subscription", subscription_added, subscription_removed, true);
+
+module:hook("message/host", function(event)
+	local origin, stanza = event.origin, event.stanza;
+	local ret = nil;
+	local service = stanza.attr.from;
+	module:log("debug", "Got message/host: %s", stanza:top_tag());
+	for event_container in stanza:childtags("event", xmlns_pubsub_event) do
+		for pubsub_event in event_container:childtags() do
+			module:log("debug", "Got pubsub event %s", pubsub_event:top_tag());
+			local node = pubsub_event.attr.node;
+			module:fire_event("pubsub-event/host/"..pubsub_event.name, {
+					stanza = stanza;
+					origin = origin;
+					event = pubsub_event;
+					service = service;
+					node = node;
+				});
+			ret = true;
+		end
+	end
+	return ret;
+end);
+
+module:hook("pubsub-event/host/items", function (event)
+	for item in event.event:childtags() do
+		module:log("debug", "Got pubsub item event %s", item:top_tag());
+		event.item = item;
+		event.payload = item.tags[1];
+		module:fire_event("pubsub-event/host/"..item.name, event);
+	end
+end);