Software /
code /
prosody-modules
File
mod_pubsub_serverinfo/mod_pubsub_serverinfo.lua @ 5854:801f64e6d4e9
mod_pubsub_mqtt: Add TLS port (default 8883) for MQTT connections
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Tue, 30 Jan 2024 14:11:35 +0000 |
parent | 5840:7905766d01f6 |
child | 5867:79ae71f52c81 |
line wrap: on
line source
local http = require "net.http"; local json = require "util.json"; local st = require "util.stanza"; local new_id = require"util.id".medium; local dataform = require "util.dataforms".new; local local_domain = module:get_host(); local service = module:get_option(module.name .. "_service") or "pubsub." .. local_domain; local node = module:get_option(module.name .. "_node") or "serverinfo"; local actor = module.host .. "/modules/" .. module.name; local publication_interval = module:get_option(module.name .. "_publication_interval") or 300; local cache_ttl = module:get_option(module.name .. "_cache_ttl") or 3600; local public_providers_url = module:get_option_string(module.name.."_public_providers_url", "https://data.xmpp.net/providers/v2/providers-Ds.json"); local delete_node_on_unload = module:get_option_boolean(module.name.."_delete_node_on_unload", false); local persist_items = module:get_option_boolean(module.name.."_persist_items", true); local xmlns_pubsub = "http://jabber.org/protocol/pubsub"; function module.load() discover_node():next( function(exists) if not exists then create_node() end end ):catch( function(error) module:log("warn", "Error prevented discovery or creation of pub/sub node at %s: %s", service, error) end ) module:add_feature("urn:xmpp:serverinfo:0"); module:add_extension(dataform { { name = "FORM_TYPE", type = "hidden", value = "http://jabber.org/network/serverinfo" }, { name = "serverinfo-pubsub-node", type = "text-single" }, }:form({ ["serverinfo-pubsub-node"] = ("xmpp:%s?;node=%s"):format(service, node) }, "result")); if cache_ttl < publication_interval then module:log("warn", "It is recommended to have a cache interval higher than the publication interval"); end cache_warm_up() module:add_timer(10, publish_serverinfo); end function module.unload() -- This removes all subscribers, which may or may not be desirable, depending on the reason for the unload. if delete_node_on_unload then delete_node(); end end -- Returns a promise of a boolean function discover_node() local request = st.iq({ type = "get", to = service, from = actor, id = new_id() }) :tag("query", { xmlns = "http://jabber.org/protocol/disco#items" }) module:log("debug", "Sending request to discover existence of pub/sub node '%s' at %s", node, service) return module:send_iq(request):next( function(response) if response.stanza == nil or response.stanza.attr.type ~= "result" then module:log("warn", "Unexpected response to service discovery items request at %s: %s", service, response.stanza) return false end local query = response.stanza:get_child("query", "http://jabber.org/protocol/disco#items") if query ~= nil then for item in query:childtags("item") do if item.attr.jid == service and item.attr.node == node then module:log("debug", "pub/sub node '%s' at %s does exist.", node, service) return true end end end module:log("debug", "pub/sub node '%s' at %s does not exist.", node, service) return false; end ); end -- Returns a promise of a boolean function create_node() local request = st.iq({ type = "set", to = service, from = actor, id = new_id() }) :tag("pubsub", { xmlns = xmlns_pubsub }) :tag("create", { node = node, xmlns = xmlns_pubsub }):up() :tag("configure", { xmlns = xmlns_pubsub }) :tag("x", { xmlns = "jabber:x:data", type = "submit" }) :tag("field", { var = "FORM_TYPE", type = "hidden"}) :text_tag("value", "http://jabber.org/protocol/pubsub#node_config") :up() :tag("field", { var = "pubsub#max_items" }) :text_tag("value", "1") :up() :tag("field", { var = "pubsub#persist_items" }) :text_tag("value", persist_items and "1" or "0") module:log("debug", "Sending request to create pub/sub node '%s' at %s", node, service) return module:send_iq(request):next( function(response) if response.stanza == nil or response.stanza.attr.type ~= "result" then module:log("warn", "Unexpected response to pub/sub node '%s' creation request at %s: %s", node, service, response.stanza) return false else module:log("debug", "Successfully created pub/sub node '%s' at %s", node, service) return true end end ) end -- Returns a promise of a boolean function delete_node() local request = st.iq({ type = "set", to = service, from = actor, id = new_id() }) :tag("pubsub", { xmlns = xmlns_pubsub }) :tag("delete", { node = node, xmlns = xmlns_pubsub }); module:log("debug", "Sending request to delete pub/sub node '%s' at %s", node, service) return module:send_iq(request):next( function(response) if response.stanza == nil or response.stanza.attr.type ~= "result" then module:log("warn", "Unexpected response to pub/sub node '%s' deletion request at %s: %s", node, service, response.stanza) return false else module:log("debug", "Successfully deleted pub/sub node '%s' at %s", node, service) return true end end ) end function get_remote_domain_names() -- Iterate over s2s sessions, adding them to a multimap, where the key is the local domain name, -- mapped to a collection of remote domain names. De-duplicate all remote domain names by using -- them as an index in a table. local domains_by_host = {} for session, _ in pairs(prosody.incoming_s2s) do if session ~= nil and session.from_host ~= nil and local_domain == session.to_host then module:log("debug", "Local host '%s' has remote '%s' (inbound)", session.to_host, session.from_host); local sessions = domains_by_host[session.to_host] if sessions == nil then sessions = {} end; -- instantiate a new entry if none existed sessions[session.from_host] = true domains_by_host[session.to_host] = sessions end end -- At an earlier stage, the code iterated over all prosody.hosts, trying to generate one pubsub item for all local hosts. That turned out to be -- to noisy. Instead, this code now creates an item that includes the local vhost only. It is assumed that this module will also be loaded for -- other vhosts. Their data should then be published to distinct pub/sub services and nodes. -- for host, data in pairs(prosody.hosts) do local host = local_domain local data = prosody.hosts[host] if data ~= nil then local sessions = domains_by_host[host] if sessions == nil then sessions = {} end; -- instantiate a new entry if none existed if data.s2sout ~= nil then for _, session in pairs(data.s2sout) do if session.to_host ~= nil then module:log("debug", "Local host '%s' has remote '%s' (outbound)", host, session.to_host); sessions[session.to_host] = true domains_by_host[host] = sessions end end end -- When the instance of Prosody hosts more than one host, the other hosts can be thought of as having a 'permanent' s2s connection. for host_name, host_info in pairs(prosody.hosts) do if host ~= host_name and host_info.type ~= "component" then module:log("debug", "Local host '%s' has remote '%s' (vhost)", host, host_name); sessions[host_name] = true; domains_by_host[host] = sessions end end end return domains_by_host end function publish_serverinfo() module:log("debug", "Publishing server info..."); local domains_by_host = get_remote_domain_names() -- Build the publication stanza. local request = st.iq({ type = "set", to = service, from = actor, id = new_id() }) :tag("pubsub", { xmlns = xmlns_pubsub }) :tag("publish", { node = node, xmlns = xmlns_pubsub }) :tag("item", { id = "current", xmlns = xmlns_pubsub }) :tag("serverinfo", { xmlns = "urn:xmpp:serverinfo:0" }) request:tag("domain", { name = local_domain }) :tag("federation") local remotes = domains_by_host[local_domain] if remotes ~= nil then for remote, _ in pairs(remotes) do -- include a domain name for remote domains, but only if they advertise support. if does_opt_in(remote) then request:tag("remote-domain", { name = remote }):up() else request:tag("remote-domain"):up() end end end request:up():up() module:send_iq(request):next( function(response) if response.stanza == nil or response.stanza.attr.type ~= "result" then module:log("warn", "Unexpected response to item publication at pub/sub node '%s' on %s: %s", node, service, response.stanza) return false else module:log("debug", "Successfully published item on pub/sub node '%s' at %s", node, service) return true end end, function(error) module:log("warn", "Error prevented publication of item on pub/sub node at %s: %s", service, error) end ) return publication_interval; end local opt_in_cache = {} -- Public providers are already public, so we fetch the list of providers -- registered on providers.xmpp.net so we don't have to disco them individually local function update_public_providers() return http.request(public_providers_url) :next(function (response) assert( response.headers["content-type"] == "application/json", "invalid mimetype: "..tostring(response.headers["content-type"]) ); return json.decode(response.body); end) :next(function (public_server_domains) module:log("debug", "Retrieved list of %d public providers", #public_server_domains); for _, domain in ipairs(public_server_domains) do opt_in_cache[domain] = { opt_in = true; expires = os.time() + (86400 * 1.5); }; end end, function (err) module:log("warn", "Failed to fetch/decode provider list: %s", err); end); end module:daily("update public provider list", update_public_providers); function cache_warm_up() module:log("debug", "Warming up opt-in cache") update_public_providers():finally(function () module:log("debug", "Querying known domains for opt-in cache..."); local domains_by_host = get_remote_domain_names() local remotes = domains_by_host[local_domain] if remotes ~= nil then for remote in pairs(remotes) do does_opt_in(remote) end end end); end function does_opt_in(remoteDomain) -- try to read answer from cache. local cached_value = opt_in_cache[remoteDomain] local ttl = cached_value and os.difftime(cached_value.expires, os.time()); if cached_value and ttl > (publication_interval + 60) then module:log("debug", "Opt-in status (from cache) for '%s': %s", remoteDomain, cached_value.opt_in) return cached_value.opt_in; end -- We don't have a cached value, or it is nearing expiration - refresh it now -- TODO worry about not having multiple requests in flight to the same domain.cached_value module:log("debug", "%s: performing disco/info to determine opt-in", remoteDomain) local discoRequest = st.iq({ type = "get", to = remoteDomain, from = actor, id = new_id() }) :tag("query", { xmlns = "http://jabber.org/protocol/disco#info" }) module:send_iq(discoRequest):next( function(response) if response.stanza ~= nil and response.stanza.attr.type == "result" then local query = response.stanza:get_child("query", "http://jabber.org/protocol/disco#info") if query ~= nil then for feature in query:childtags("feature") do --module:log("debug", "Disco/info feature for '%s': %s", remoteDomain, feature) if feature.attr.var == 'urn:xmpp:serverinfo:0' then module:log("debug", "Disco/info response included opt-in for '%s'", remoteDomain) opt_in_cache[remoteDomain] = { opt_in = true; expires = os.time() + cache_ttl; } return; -- prevent 'false' to be cached, down below. end end end end module:log("debug", "Disco/info response did not include opt-in for '%s'", remoteDomain) opt_in_cache[remoteDomain] = { opt_in = false; expires = os.time() + cache_ttl; } end, function(response) module:log("debug", "An error occurred while performing a disco/info request to determine opt-in for '%s'", remoteDomain, response) opt_in_cache[remoteDomain] = { opt_in = false; expires = os.time() + cache_ttl; } end ); if ttl and ttl <= 0 then -- Cache entry expired, remove it and assume not opted in opt_in_cache[remoteDomain] = nil; return false; end return cached_value and cached_value.opt_in; end