File

mod_pubsub_mqtt/mod_pubsub_mqtt.lua @ 4730:1da4b815d2fe

mod_cloud_notify: Identify (and immediately push) urgent stanzas, e.g. calls This covers the following things: - A session that appears online, but has a broken TCP connection - Clients such as Siskin and Snikket iOS that require a push for calls to work It allows the stanza to be pushed immediately instead of waiting for the session to hibernate or an ack to timeout. It shouldn't break any existing cases.
author Matthew Wild <mwild1@gmail.com>
date Wed, 27 Oct 2021 19:12:03 +0100
parent 1343:7dbde05b48a9
child 5111:6ba2188e2686
line wrap: on
line source

module:set_global();

local mqtt = module:require "mqtt";
local st = require "util.stanza";

local pubsub_services = {};
local pubsub_subscribers = {};
local packet_handlers = {};

function handle_packet(session, packet)
	module:log("warn", "MQTT packet received! Length: %d", packet.length);
	for k,v in pairs(packet) do
		module:log("debug", "MQTT %s: %s", tostring(k), tostring(v));
	end
	local handler = packet_handlers[packet.type];
	if not handler then
		module:log("warn", "Unhandled command: %s", tostring(packet.type));
		return;
	end
	handler(session, packet);
end

function packet_handlers.connect(session, packet)
	session.conn:write(mqtt.serialize_packet{
		type = "connack";
		data = string.char(0x00, 0x00);
	});
end

function packet_handlers.disconnect(session, packet)
	session.conn:close();
end

function packet_handlers.publish(session, packet)
	module:log("warn", "PUBLISH to %s", packet.topic);
	local host, node = packet.topic:match("^([^/]+)/(.+)$");
	local pubsub = pubsub_services[host];
	if not pubsub then
		module:log("warn", "Unable to locate host/node: %s", packet.topic);
		return;
	end
	local id = "mqtt";
	local ok, err = pubsub:publish(node, true, id,
		st.stanza("data", { xmlns = "https://prosody.im/protocol/data" })
			:text(packet.data)
	);
	if not ok then
		module:log("warn", "Error publishing MQTT data: %s", tostring(err));
	end
end

function packet_handlers.subscribe(session, packet)
	for _, topic in ipairs(packet.topics) do
		module:log("warn", "SUBSCRIBE to %s", topic);
		local host, node = topic:match("^([^/]+)/(.+)$");
		local pubsub = pubsub_subscribers[host];
		if not pubsub then
			module:log("warn", "Unable to locate host/node: %s", topic);
			return;
		end
		local node_subs = pubsub[node];
		if not node_subs then
			node_subs = {};
			pubsub[node] = node_subs;
		end
		session.subscriptions[topic] = true;
		node_subs[session] = true;
	end

end

function packet_handlers.pingreq(session, packet)
	session.conn:write(mqtt.serialize_packet{type = "pingresp"});
end

local sessions = {};

local mqtt_listener = {};

function mqtt_listener.onconnect(conn)
	sessions[conn] = {
		conn = conn;
		stream = mqtt.new_stream();
		subscriptions = {};
	};
end

function mqtt_listener.onincoming(conn, data)
	local session = sessions[conn];
	if session then
		local packets = session.stream:feed(data);
		for i = 1, #packets do
			handle_packet(session, packets[i]);
		end
	end
end

function mqtt_listener.ondisconnect(conn)
	local session = sessions[conn];
	for topic in pairs(session.subscriptions) do
		local host, node = topic:match("^([^/]+)/(.+)$");
		local subs = pubsub_subscribers[host];
		if subs then
			local node_subs = subs[node];
			if node_subs then
				node_subs[session] = nil;
			end
		end
	end
	sessions[conn] = nil;
	module:log("debug", "MQTT client disconnected");
end

module:provides("net", {
	default_port = 1883;
	listener = mqtt_listener;
});

local function tostring_content(item)
	return tostring(item[1]);
end

local data_translators = setmetatable({
	["data https://prosody.im/protocol/data"] = tostring_content;
	["json urn:xmpp:json:0"] = tostring_content;
}, {
	__index = function () return tostring; end;
});

function module.add_host(module)
	local pubsub_module = hosts[module.host].modules.pubsub
	if pubsub_module then
		module:log("debug", "MQTT enabled for %s", module.host);
		module:depends("pubsub");
		pubsub_services[module.host] = assert(pubsub_module.service);
		local subscribers = {};
		pubsub_subscribers[module.host] = subscribers;
		local function handle_publish(event)
			-- Build MQTT packet
			local packet = mqtt.serialize_packet{
				type = "publish";
				id = "\000\000";
				topic = module.host.."/"..event.node;
				data = data_translators[tostring(event.item.name).." "..tostring(event.item.attr.xmlns)](event.item);
			};
			-- Broadcast to subscribers
			module:log("debug", "Broadcasting PUBLISH to subscribers of %s/%s", module.host, event.node);
			for session in pairs(subscribers[event.node] or {}) do
				session.conn:write(packet);
				module:log("debug", "Sent to %s", tostring(session));
			end
		end
		pubsub_services[module.host].events.add_handler("item-published", handle_publish);
		function module.unload()
			module:log("debug", "MQTT disabled for %s", module.host);
			pubsub_module.service.remove_handler("item-published", handle_publish);
			pubsub_services[module.host] = nil;
			pubsub_subscribers[module.host] = nil;
		end
	end
end