File

mod_pubsub_mqtt/mod_pubsub_mqtt.lua @ 5424:b45d9a81b3da

mod_http_oauth2: Revert role selector, going to try something else Back out f2c7bb3af600 Allowing only a single role to be encoded into the grant takes away the possibility of having multiple roles in the grant, one of which is selected when issuing an access token. It also takes away the ability to have zero roles granted, which could be useful e.g. when you only need OIDC scopes.
author Kim Alvefur <zash@zash.se>
date Sun, 07 May 2023 19:40:57 +0200
parent 5113:85a7304cfea1
child 5854:801f64e6d4e9
line wrap: on
line source

module:set_global();

local mqtt = module:require "mqtt";
local id = require "util.id";
local st = require "util.stanza";

local function tostring_content(item)
	return tostring(item[1]);
end

local data_translators = setmetatable({
	utf8 = {
		from_item = function (item)
			return item:find("{https://prosody.im/protocol/data}data#");
		end;
		to_item = function (payload)
			return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() })
				:text_tag("data", payload, { xmlns = "https://prosody.im/protocol/data" })
		end;
	};
	json = {
		from_item = function (item)
			return item:find("{urn:xmpp:json:0}json#");
		end;
		to_item = function (payload)
			return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() })
				:text_tag("json", payload, { xmlns = "urn:xmpp:json:0" });
		end;
	};
	atom_title = {
		from_item = function (item)
			return item:find("{http://www.w3.org/2005/Atom}entry/title#");
		end;
		to_item = function (payload)
			return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() })
				:tag("entry", { xmlns = "http://www.w3.org/2005/Atom" })
					:text_tag("title", payload, { type = "text" });
		end;
	};
}, {
	__index = function () return { from_item = tostring }; end;
});

local pubsub_services = {};
local pubsub_subscribers = {};
local packet_handlers = {};

function handle_packet(session, packet)
	module:log("debug", "MQTT packet received! Length: %d", packet.length);
	for k,v in pairs(packet) do
		module:log("debug", "MQTT %s: %s", tostring(k), tostring(v));
	end
	local handler = packet_handlers[packet.type];
	if not handler then
		module:log("warn", "Unhandled command: %s", tostring(packet.type));
		return;
	end
	handler(session, packet);
end

function packet_handlers.connect(session, packet)
	session.conn:write(mqtt.serialize_packet{
		type = "connack";
		data = string.char(0x00, 0x00);
	});
end

function packet_handlers.disconnect(session, packet)
	session.conn:close();
end

function packet_handlers.publish(session, packet)
	module:log("info", "PUBLISH to %s", packet.topic);
	local host, payload_type, node = packet.topic:match("^([^/]+)/([^/]+)/(.+)$");
	if not host then
		module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE");
		return;
	end
	local pubsub = pubsub_services[host];
	if not pubsub then
		module:log("warn", "Unable to locate host/node: %s", packet.topic);
		return;
	end

	local payload_translator = data_translators[payload_type];
	if not payload_translator or not payload_translator.to_item then
		module:log("warn", "Unsupported payload type '%s' on topic '%s'", payload_type, packet.topic);
		return;
	end

	local payload_item = payload_translator.to_item(packet.data);
	local ok, err = pubsub:publish(node, true, payload_item.attr.id, payload_item);
	if not ok then
		module:log("warn", "Error publishing MQTT data: %s", tostring(err));
	end
end

function packet_handlers.subscribe(session, packet)
	for _, topic in ipairs(packet.topics) do
		module:log("info", "SUBSCRIBE to %s", topic);
		local host, payload_type, node = topic:match("^([^/]+)/([^/]+)/(.+)$");
		if not host then
			module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE");
			return;
		end
		local pubsub = pubsub_subscribers[host];
		if not pubsub then
			module:log("warn", "Unable to locate host/node: %s", topic);
			return;
		end
		local node_subs = pubsub[node];
		if not node_subs then
			node_subs = {};
			pubsub[node] = node_subs;
		end
		session.subscriptions[topic] = payload_type;
		node_subs[session] = payload_type;
	end

end

function packet_handlers.pingreq(session, packet)
	session.conn:write(mqtt.serialize_packet{type = "pingresp"});
end

local sessions = {};

local mqtt_listener = {};

function mqtt_listener.onconnect(conn)
	sessions[conn] = {
		conn = conn;
		stream = mqtt.new_stream();
		subscriptions = {};
	};
end

function mqtt_listener.onincoming(conn, data)
	local session = sessions[conn];
	if session then
		local packets = session.stream:feed(data);
		for i = 1, #packets do
			handle_packet(session, packets[i]);
		end
	end
end

function mqtt_listener.ondisconnect(conn)
	local session = sessions[conn];
	for topic in pairs(session.subscriptions) do
		local host, node = topic:match("^([^/]+)/(.+)$");
		local subs = pubsub_subscribers[host];
		if subs then
			local node_subs = subs[node];
			if node_subs then
				node_subs[session] = nil;
			end
		end
	end
	sessions[conn] = nil;
	module:log("debug", "MQTT client disconnected");
end

module:provides("net", {
	default_port = 1883;
	listener = mqtt_listener;
});

function module.add_host(module)
	local pubsub_module = hosts[module.host].modules.pubsub
	if pubsub_module then
		module:log("debug", "MQTT enabled for %s", module.host);
		module:depends("pubsub");
		pubsub_services[module.host] = assert(pubsub_module.service);
		local subscribers = {};
		pubsub_subscribers[module.host] = subscribers;
		local function handle_publish(event)
			-- Build MQTT packet
			local packet_types = setmetatable({}, {
				__index = function (self, payload_type)
					local packet = mqtt.serialize_packet{
						type = "publish";
						id = "\000\000";
						topic = module.host.."/"..payload_type.."/"..event.node;
						data = data_translators[payload_type].from_item(event.item) or "";
					};
					rawset(self, packet);
					return packet;
				end;
			});
			-- Broadcast to subscribers
			module:log("debug", "Broadcasting PUBLISH to subscribers of %s/*/%s", module.host, event.node);
			for session, payload_type in pairs(subscribers[event.node] or {}) do
				session.conn:write(packet_types[payload_type]);
				module:log("debug", "Sent to %s", tostring(session));
			end
		end
		pubsub_services[module.host].events.add_handler("item-published", handle_publish);
		function module.unload()
			module:log("debug", "MQTT disabled for %s", module.host);
			pubsub_module.service.remove_handler("item-published", handle_publish);
			pubsub_services[module.host] = nil;
			pubsub_subscribers[module.host] = nil;
		end
	end
end