Software / code / prosody-modules
File
mod_pubsub_subscription/mod_pubsub_subscription.lua @ 6325:6ea80b73d8f2
mod_http_oauth2: Only require redirect URIs when using grant types that need it
In the Device flow, no redirect URI is used because the client instead
receives responses by polling. It is therefore unnecessary to enforce a
requirement that these include redirect URI(s).
| author | Kim Alvefur <zash@zash.se> |
|---|---|
| date | Thu, 03 Jul 2025 15:42:42 +0200 |
| parent | 6119:6dca425eea15 |
line wrap: on
line source
local id = require "util.id"; local st = require "util.stanza"; local uuid = require "util.uuid"; local mt = require "util.multitable"; local cache = require "util.cache"; local xmlns_pubsub = "http://jabber.org/protocol/pubsub"; local xmlns_pubsub_event = "http://jabber.org/protocol/pubsub#event"; -- TODO persist -- TODO query known pubsub nodes to sync current subscriptions -- TODO subscription ids per 'item' would be handy local pending_subscription = cache.new(256); -- uuid → node local pending_unsubscription = cache.new(256); -- uuid → node local active_subscriptions = mt.new() -- service | node | subscriber | uuid | { item } function module.save() return { active_subscriptions = active_subscriptions.data } end function module.restore(data) if data and data.active_subscriptions then active_subscriptions.data = data.active_subscriptions end end local valid_events = {"subscribed"; "unsubscribed"; "error"; "item"; "retract"; "purge"; "delete"} local function subscription_added(item_event) local item = item_event.item; assert(item.service, "pubsub subscription item MUST have a 'service' field."); assert(item.node, "pubsub subscription item MUST have a 'node' field."); item.from = item.from or module.host; local already_subscibed = false; for _ in active_subscriptions:iter(item.service, item.node, item.from, nil) do -- luacheck: ignore 512 already_subscibed = true; break end item._id = uuid.generate(); local iq_id = "pubsub-sub-"..id.short(); pending_subscription:set(iq_id, item._id); active_subscriptions:set(item.service, item.node, item.from, item._id, item); if not already_subscibed then module:send(st.iq({ type = "set", id = iq_id, from = item.from, to = item.service }) :tag("pubsub", { xmlns = xmlns_pubsub }) :tag("subscribe", { jid = item.from, node = item.node })); end end for _, event_name in ipairs(valid_events) do module:hook("pubsub-event/host/"..event_name, function (event) for _, _, _, _, _, cb in active_subscriptions:iter(event.service, event.node, event.stanza.attr.to, nil, "on_"..event_name) do event.handled = true; pcall(cb, event); end end); module:hook("pubsub-event/bare/"..event_name, function (event) for _, _, _, _, _, cb in active_subscriptions:iter(event.service, event.node, event.stanza.attr.to, nil, "on_"..event_name) do event.handled = true; pcall(cb, event); end end); end function handle_iq(context, event) local stanza = event.stanza; local service = stanza.attr.from; if not stanza.attr.id then return end -- shouldn't be possible if not stanza.attr.id:match("^pubsub%-sub%-") then return end local subscribed_node = pending_subscription:get(stanza.attr.id); pending_subscription:set(stanza.attr.id, nil); local unsubscribed_node = pending_unsubscription:get(stanza.attr.id); pending_unsubscription:set(stanza.attr.id, nil); if stanza.attr.type == "result" then local pubsub_wrapper = stanza:get_child("pubsub", xmlns_pubsub); local subscription = pubsub_wrapper and pubsub_wrapper:get_child("subscription"); if not subscription then return end local node = subscription.attr.node; local what; if subscription.attr.subscription == "subscribed" then what = "on_subscribed"; elseif subscription.attr.subscription == "none" then what = "on_unsubscribed"; end if not what then return end -- there are other states but we don't handle them for _, _, _, _, _, cb in active_subscriptions:iter(service, node, stanza.attr.to, nil, what) do cb(event); end return true; elseif stanza.attr.type == "error" then local node = subscribed_node or unsubscribed_node; local error_type, error_condition, reason, pubsub_error = stanza:get_error(); local err = { type = error_type, condition = error_condition, text = reason, extra = pubsub_error }; if active_subscriptions:get(service) then for _, _, _, _, _, cb in active_subscriptions:iter(service, node, stanza.attr.to, nil, "on_error") do cb(err); end return true; end end end module:hook("iq/host", function (event) handle_iq("host", event); end, 1); module:hook("iq/bare", function (event) handle_iq("bare", event); end, 1); local function subscription_removed(item_event) local item = item_event.item; active_subscriptions:set(item.service, item.node, item.from, item._id, nil); local node_subs = active_subscriptions:get(item.service, item.node, item.from); if node_subs and next(node_subs) then return end local iq_id = "pubsub-sub-"..id.short(); pending_unsubscription:set(iq_id, item._id); module:send(st.iq({ type = "set", id = iq_id, from = item.from, to = item.service }) :tag("pubsub", { xmlns = xmlns_pubsub }) :tag("unsubscribe", { jid = item.from, node = item.node })) end module:handle_items("pubsub-subscription", subscription_added, subscription_removed, true); function handle_message(context, event) local origin, stanza = event.origin, event.stanza; local handled = nil; local service = stanza.attr.from; module:log("debug", "Got message/%s: %s", context, stanza:top_tag()); for event_container in stanza:childtags("event", xmlns_pubsub_event) do for pubsub_event in event_container:childtags() do module:log("debug", "Got pubsub event %s", pubsub_event:top_tag()); local node = pubsub_event.attr.node; local event_data = { stanza = stanza; origin = origin; event = pubsub_event; service = service; node = node; handled = false; }; module:fire_event("pubsub-event/" .. context .. "/"..pubsub_event.name, event_data); if not handled and event_data.handled then handled = true; end end end -- If not addressed to the host, let it fall through to normal handling -- (it may be on its way to a local client), otherwise, we'll mark the -- event as handled to suppress an error response if we handled it. if context == "host" and handled then return true; end end module:hook("message/host", function(event) return handle_message("host", event); end); module:hook("message/bare", function(event) return handle_message("bare", event); end); function handle_items(context, event) for item in event.event:childtags() do module:log("debug", "Got pubsub item event %s", item:top_tag()); event.item = item; event.payload = item.tags[1]; module:fire_event("pubsub-event/" .. context .. "/"..item.name, event); end end module:hook("pubsub-event/host/items", function (event) handle_items("host", event); end); module:hook("pubsub-event/bare/items", function (event) handle_items("bare", event); end);