Changeset

1240:e0d97eb52ab8

mod_pubsub_mqtt: MQTT (a lightweight binary pubsub protocol) interface for mod_pubsub
author Matthew Wild <mwild1@gmail.com>
date Sun, 01 Dec 2013 19:12:08 +0000
parents 1239:cc5cbeeb9fc7
children 1241:2380a5d71448
files mod_pubsub_mqtt/mod_pubsub_mqtt.lua mod_pubsub_mqtt/mqtt.lib.lua
diffstat 2 files changed, 322 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_pubsub_mqtt/mod_pubsub_mqtt.lua	Sun Dec 01 19:12:08 2013 +0000
@@ -0,0 +1,161 @@
+module:set_global();
+
+local mqtt = module:require "mqtt";
+local st = require "util.stanza";
+
+local pubsub_services = {};
+local pubsub_subscribers = {};
+local packet_handlers = {};
+
+function handle_packet(session, packet)
+	module:log("warn", "MQTT packet received! Length: %d", packet.length);
+	for k,v in pairs(packet) do
+		module:log("debug", "MQTT %s: %s", tostring(k), tostring(v));
+	end
+	local handler = packet_handlers[packet.type];
+	if not handler then
+		module:log("warn", "Unhandled command: %s", tostring(packet.type));
+		return;
+	end
+	handler(session, packet);
+end
+
+function packet_handlers.connect(session, packet)
+	session.conn:write(mqtt.serialize_packet{
+		type = "connack";
+		data = string.char(0x00, 0x00);
+	});
+end
+
+function packet_handlers.disconnect(session, packet)
+	session.conn:close();
+end
+
+function packet_handlers.publish(session, packet)
+	module:log("warn", "PUBLISH to %s", packet.topic);
+	local host, node = packet.topic:match("^([^/]+)/(.+)$");
+	local pubsub = pubsub_services[host];
+	if not pubsub then
+		module:log("warn", "Unable to locate host/node: %s", packet.topic);
+		return;
+	end
+	local id = "mqtt";
+	local ok, err = pubsub:publish(node, true, id,
+		st.stanza("data", { xmlns = "https://prosody.im/protocol/mqtt" })
+			:text(packet.data)
+	);
+	if not ok then
+		module:log("warn", "Error publishing MQTT data: %s", tostring(err));
+	end
+end
+
+function packet_handlers.subscribe(session, packet)
+	for _, topic in ipairs(packet.topics) do
+		module:log("warn", "SUBSCRIBE to %s", topic);
+		local host, node = topic:match("^([^/]+)/(.+)$");
+		local pubsub = pubsub_subscribers[host];
+		if not pubsub then
+			module:log("warn", "Unable to locate host/node: %s", topic);
+			return;
+		end
+		local node_subs = pubsub[node];
+		if not node_subs then
+			node_subs = {};
+			pubsub[node] = node_subs;
+		end
+		session.subscriptions[topic] = true;
+		node_subs[session] = true;
+	end
+	
+end
+
+function packet_handlers.pingreq(session, packet)
+	session.conn:write(mqtt.serialize_packet{type = "pingresp"});
+end
+
+local sessions = {};
+
+local mqtt_listener = {};
+
+function mqtt_listener.onconnect(conn)
+	sessions[conn] = {
+		conn = conn;
+		stream = mqtt.new_stream();
+		subscriptions = {};
+	};
+end
+
+function mqtt_listener.onincoming(conn, data)
+	local session = sessions[conn];
+	if session then
+		local packets = session.stream:feed(data);
+		for i = 1, #packets do
+			handle_packet(session, packets[i]);
+		end
+	end
+end
+
+function mqtt_listener.ondisconnect(conn)
+	local session = sessions[conn];
+	for topic in pairs(session.subscriptions) do
+		local host, node = topic:match("^([^/]+)/(.+)$");
+		local subs = pubsub_subscribers[host];
+		if subs then
+			local node_subs = subs[node];
+			if node_subs then
+				node_subs[session] = nil;
+			end
+		end
+	end
+	sessions[conn] = nil;
+	module:log("debug", "MQTT client disconnected");
+end
+
+module:provides("net", {
+	default_port = 1883;
+	listener = mqtt_listener;
+});
+
+local function tostring_content(item)
+	return tostring(item[1]);
+end
+
+local data_translators = setmetatable({
+	["data https://prosody.im/protocol/mqtt"] = tostring_content;
+	["json urn:xmpp:json:0"] = tostring_content;
+}, {
+	__index = function () return tostring; end;
+});
+
+function module.add_host(module)
+	local pubsub_module = hosts[module.host].modules.pubsub
+	if pubsub_module then
+		module:log("debug", "MQTT enabled for %s", module.host);
+		module:depends("pubsub");
+		pubsub_services[module.host] = assert(pubsub_module.service);
+		local subscribers = {};
+		pubsub_subscribers[module.host] = subscribers;
+		local function handle_publish(event)
+			-- Build MQTT packet
+			local packet = mqtt.serialize_packet{
+				type = "publish";
+				id = "\000\000";
+				topic = module.host.."/"..event.node;
+				data = data_translators[event.item.name.." "..event.item.attr.xmlns](event.item);
+			};
+			-- Broadcast to subscribers
+			module:log("debug", "Broadcasting PUBLISH to subscribers of %s/%s", module.host, event.node);
+			for session in pairs(subscribers[event.node] or {}) do
+				session.conn:write(packet);
+				module:log("debug", "Sent to %s", tostring(session));
+			end
+		end
+		pubsub_services[module.host].events.add_handler("item-published", handle_publish);
+		function module.unload()
+			module:log("debug", "MQTT disabled for %s", module.host);
+			pubsub_module.service.remove_handler("item-published", handle_publish);
+			pubsub_services[module.host] = nil;
+			pubsub_subscribers[module.host] = nil;
+		end
+	end
+end
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_pubsub_mqtt/mqtt.lib.lua	Sun Dec 01 19:12:08 2013 +0000
@@ -0,0 +1,161 @@
+local bit = require "bit";
+
+local stream_mt = {};
+stream_mt.__index = stream_mt;
+
+function stream_mt:read_bytes(n_bytes)
+	module:log("debug", "Reading %d bytes... (buffer: %d)", n_bytes, #self.buffer);
+	local data = self.buffer;
+	if not data then
+		module:log("debug", "No data, pausing.");
+		data = coroutine.yield();
+		module:log("debug", "Have %d bytes of data now (want %d)", #data, n_bytes);
+	end
+	if #data >= n_bytes then
+		data, self.buffer = data:sub(1, n_bytes), data:sub(n_bytes+1);
+	elseif #data < n_bytes then
+		module:log("debug", "Not enough data (only %d bytes out of %d), pausing.", #data, n_bytes);
+		self.buffer = data..coroutine.yield();
+		module:log("debug", "Now we have %d bytes, reading...", #data);
+		return self:read_bytes(n_bytes);
+	end
+	module:log("debug", "Returning %d bytes (buffer: %d)", #data, #self.buffer);
+	return data;
+end
+
+function stream_mt:read_string()
+	local len1, len2 = self:read_bytes(2):byte(1,2);
+	local len = bit.lshift(len1, 8) + len2;
+	return self:read_bytes(len), len+2;
+end
+
+local packet_type_codes = {
+	"connect", "connack",
+	"publish", "puback", "pubrec", "pubrel", "pubcomp",
+	"subscribe", "subak", "unsubscribe", "unsuback",
+	"pingreq", "pingresp",
+	"disconnect"
+};
+
+function stream_mt:read_packet()
+	local packet = {};
+	local header = self:read_bytes(1):byte();
+	packet.type = packet_type_codes[bit.rshift(bit.band(header, 0xf0), 4)];
+	packet.dup = bit.band(header, 0x08) == 0x08;
+	packet.qos = bit.rshift(bit.band(header, 0x06), 1);
+	packet.retain = bit.band(header, 0x01) == 0x01;
+	
+	-- Get length
+	local length, multiplier = 0, 1;
+	repeat
+		local digit = self:read_bytes(1):byte();
+		length = length + bit.band(digit, 0x7f)*multiplier;
+		multiplier = multiplier*128;
+	until bit.band(digit, 0x80) == 0;
+	packet.length = length;
+	if packet.type == "connect" then
+		if self:read_string() ~= "MQIsdp" then
+			module:log("warn", "Unexpected packet signature!");
+			packet.type = nil; -- Invalid packet
+		else
+			packet.version = self:read_bytes(1):byte();
+			packet.connect_flags = self:read_bytes(1):byte();
+			packet.keepalive_timer = self:read_bytes(1):byte();
+			length = length - 11;
+		end
+	elseif packet.type == "publish" then
+		packet.topic = self:read_string();
+		length = length - (#packet.topic+2);
+		if packet.qos == 1 or packet.qos == 2 then
+			packet.id = self:read_bytes(2);
+			length = length - 2;
+		end
+	elseif packet.type == "subscribe" then
+		if packet.qos == 1 or packet.qos == 2 then
+			packet.id = self:read_bytes(2);
+			length = length - 2;
+		end
+		local topics = {};
+		while length > 0 do
+			local topic, len = self:read_string();
+			table.insert(topics, topic);
+			self:read_bytes(1); -- QoS not used
+			length = length - (len+1);
+		end
+		packet.topics = topics;
+	end
+	if length > 0 then
+		packet.data = self:read_bytes(length);
+	end
+	return packet;
+end
+
+local function new_parser(self)
+	return coroutine.wrap(function (data)
+		self.buffer = data;
+		while true do
+			data = coroutine.yield(self:read_packet());
+			module:log("debug", "Parser: %d new bytes", #data);
+			self.buffer = (self.buffer or "")..data;
+		end
+	end);
+end
+
+function stream_mt:feed(data)
+	module:log("debug", "Feeding %d bytes", #data);
+	local packets = {};
+	local packet = self.parser(data);
+	while packet do
+		module:log("debug", "Received packet");
+		table.insert(packets, packet);
+		packet = self.parser("");
+	end
+	module:log("debug", "Returning %d packets", #packets);
+	return packets;
+end
+
+local function new_stream()
+	local stream = setmetatable({}, stream_mt);
+	stream.parser = new_parser(stream);
+	return stream;
+end
+
+local function serialize_packet(packet)
+	local type_num = 0;
+	for i, v in ipairs(packet_type_codes) do -- FIXME: I'm so tired right now.
+		if v == packet.type then
+			type_num = i;
+			break;
+		end
+	end
+	local header = string.char(bit.lshift(type_num, 4));
+	
+	if packet.type == "publish" then
+		local topic = packet.topic or "";
+		packet.data = string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic..packet.data;
+	elseif packet.type == "suback" then
+		local t = {};
+		for _, topic in ipairs(packet.topics) do
+			table.insert(t, string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic.."\000");
+		end
+		packet.data = table.concat(t);
+	end
+	
+	-- Get length
+	local length = #(packet.data or "");
+	repeat
+		local digit = length%128;
+		length = math.floor(length/128);
+		if length > 0 then
+			digit = bit.bor(digit, 0x80);
+		end
+		header = header..string.char(digit); -- FIXME: ...
+	until length <= 0;
+	
+	return header..(packet.data or "");
+end
+
+return {
+	new_stream = new_stream;
+	serialize_packet = serialize_packet;
+};