Software /
code /
prosody-modules
Changeset
5857:58df53eefa28
mod_pubsub_mqtt: Update to MQTT 3.1.1
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Wed, 07 Feb 2024 11:57:30 +0000 |
parents | 5855:5afc8273c5ef |
children | 5858:866a49f5aa61 |
files | mod_pubsub_mqtt/README.markdown mod_pubsub_mqtt/mod_pubsub_mqtt.lua mod_pubsub_mqtt/mqtt.lib.lua |
diffstat | 3 files changed, 110 insertions(+), 41 deletions(-) [+] |
line wrap: on
line diff
--- a/mod_pubsub_mqtt/README.markdown Tue Jan 30 14:26:14 2024 +0000 +++ b/mod_pubsub_mqtt/README.markdown Wed Feb 07 11:57:30 2024 +0000 @@ -11,35 +11,38 @@ to embedded devices. This module provides a way for MQTT clients to connect to Prosody and publish or subscribe to local pubsub nodes. +The module currently implements MQTT version 3.1.1. + Details ------- MQTT has the concept of 'topics' (similar to XMPP's pubsub 'nodes'). mod\_pubsub\_mqtt maps pubsub nodes to MQTT topics of the form -`HOST/NODE`, e.g.`pubsub.example.org/mynode`. +`<HOST>/<TYPE>/<NODE>`, e.g.`pubsub.example.org/json/mynode`. + +The 'TYPE' parameter in the topic allows the client to choose the payload +format it will send/receive. For the supported values of 'TYPE' see the +'Payloads' section below. ### Limitations The current implementation is quite basic, and in particular: - Authentication is not supported -- SSL/TLS is not supported - Only QoS level 0 is supported ### Payloads XMPP payloads are always XML, but MQTT does not define a payload format. -Therefore mod\_pubsub\_mqtt will attempt to convert data of certain -recognised payload types. Currently supported: +Therefore mod\_pubsub\_mqtt has some built-in data format translators. + +Currently supported data types: -- JSON (see [XEP-0335](http://xmpp.org/extensions/xep-0335.html) for - the format) -- Plain UTF-8 text (wrapped inside +- `json`: See [XEP-0335](http://xmpp.org/extensions/xep-0335.html) for + the format. +- `utf8`: Plain UTF-8 text (wrapped inside `<data xmlns="https://prosody.im/protocol/mqtt"/>`) - -All other XMPP payload types are sent to the client directly as XML. -Data published by MQTT clients is currently never translated, and always -treated as UTF-8 text. +- `atom_title`: Returns the title of an Atom entry as UTF-8 data Configuration ------------- @@ -51,16 +54,15 @@ modules_enabled = { "pubsub_mqtt" } You may also configure which port(s) mod\_pubsub\_mqtt listens on using -Prosody's standard config directives, such as `mqtt_ports`. Network -settings **must** be specified in the global section of the config file, -not under any particular pubsub component. The default port is 1883 -(MQTT's standard port number). +Prosody's standard config directives, such as `mqtt_ports` and +`mqtt_tls_ports`. Network settings **must** be specified in the global section +of the config file, not under any particular pubsub component. The default +port is 1883 (MQTT's standard port number) and 8883 for TLS connections. Compatibility ------------- ------- -------------- trunk Works - 0.9 Works - 0.8 Doesn't work + 0.12 Works ------- --------------
--- a/mod_pubsub_mqtt/mod_pubsub_mqtt.lua Tue Jan 30 14:26:14 2024 +0000 +++ b/mod_pubsub_mqtt/mod_pubsub_mqtt.lua Wed Feb 07 11:57:30 2024 +0000 @@ -59,6 +59,15 @@ end function packet_handlers.connect(session, packet) + module:log("info", "MQTT client connected (sending connack)"); + module:log("debug", "MQTT version: %02x", packet.version); + if packet.version ~= 0x04 then -- Version mismatch + session.conn:write(mqtt.serialize_packet{ + type = "connack"; + data = string.char(0x00, 0x01); + }); + return; + end session.conn:write(mqtt.serialize_packet{ type = "connack"; data = string.char(0x00, 0x00); @@ -96,27 +105,33 @@ end function packet_handlers.subscribe(session, packet) - for _, topic in ipairs(packet.topics) do + local results = {}; + for i, topic in ipairs(packet.topics) do module:log("info", "SUBSCRIBE to %s", topic); local host, payload_type, node = topic:match("^([^/]+)/([^/]+)/(.+)$"); if not host then module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE"); - return; - end - local pubsub = pubsub_subscribers[host]; - if not pubsub then - module:log("warn", "Unable to locate host/node: %s", topic); - return; + results[i] = 0x80; -- Failure + else + local pubsub = pubsub_subscribers[host]; + if not pubsub then + module:log("warn", "Unable to locate host/node: %s", topic); + results[i] = 0x80; -- Failure + else + local node_subs = pubsub[node]; + if not node_subs then + node_subs = {}; + pubsub[node] = node_subs; + end + session.subscriptions[topic] = payload_type; + node_subs[session] = payload_type; + module:log("debug", "Successfully subscribed to %s", topic); + results[i] = 0x00; -- Success + end end - local node_subs = pubsub[node]; - if not node_subs then - node_subs = {}; - pubsub[node] = node_subs; - end - session.subscriptions[topic] = payload_type; - node_subs[session] = payload_type; end - + local ack = mqtt.serialize_packet{ type = "suback", id = packet.id, results = results }; + session.conn:write(ack); end function packet_handlers.pingreq(session, packet) @@ -191,7 +206,7 @@ topic = module.host.."/"..payload_type.."/"..event.node; data = data_translators[payload_type].from_item(event.item) or ""; }; - rawset(self, packet); + rawset(self, payload_type, packet); return packet; end; });
--- a/mod_pubsub_mqtt/mqtt.lib.lua Tue Jan 30 14:26:14 2024 +0000 +++ b/mod_pubsub_mqtt/mqtt.lib.lua Wed Feb 07 11:57:30 2024 +0000 @@ -1,4 +1,4 @@ -local bit = require "bit"; +local bit = require "util.bitcompat"; local stream_mt = {}; stream_mt.__index = stream_mt; @@ -29,10 +29,25 @@ return self:read_bytes(len), len+2; end +function stream_mt:read_word() + local len1, len2 = self:read_bytes(2):byte(1,2); + local result = bit.lshift(len1, 8) + len2; + module:log("debug", "read_word(%02x, %02x) = %04x (%d)", len1, len2, result, result); + return result; +end + +local function hasbit(byte, n_bit) + return bit.band(byte, 2^n_bit) ~= 0; +end + +local function encode_string(str) + return string.char(bit.band(#str, 0xff00), bit.band(#str, 0x00ff))..str; +end + local packet_type_codes = { "connect", "connack", "publish", "puback", "pubrec", "pubrel", "pubcomp", - "subscribe", "subak", "unsubscribe", "unsuback", + "subscribe", "suback", "unsubscribe", "unsuback", "pingreq", "pingresp", "disconnect" }; @@ -59,9 +74,46 @@ packet.type = nil; -- Invalid packet else packet.version = self:read_bytes(1):byte(); - packet.connect_flags = self:read_bytes(1):byte(); - packet.keepalive_timer = self:read_bytes(1):byte(); + module:log("debug", "ver: %02x", packet.version); + if packet.version ~= 0x04 then + module:log("warn", "MQTT version mismatch (got %02x, we support %02x", packet.version, 0x04); + end + local flags = self:read_bytes(1):byte(); + module:log("debug", "flags: %02x", flags); + packet.keepalive_timer = self:read_bytes(2):byte(); + module:log("debug", "keepalive: %d", packet.keepalive_timer); + packet.connect_flags = {}; length = length - 11; + packet.connect_flags = { + clean_session = hasbit(flags, 1); + will = hasbit(flags, 2); + will_qos = bit.band(bit.rshift(flags, 2), 0x02); + will_retain = hasbit(flags, 5); + user_name = hasbit(flags, 7); + password = hasbit(flags, 6); + }; + module:log("debug", "%s", require "util.serialization".serialize(packet.connect_flags, "debug")); + module:log("debug", "Reading client_id..."); + packet.client_id = self:read_string(); + if packet.connect_flags.will then + module:log("debug", "Reading will..."); + packet.will = { + topic = self:read_string(); + message = self:read_string(); + qos = packet.connect_flags.will_qos; + retain = packet.connect_flags.will_retain; + }; + end + if packet.connect_flags.user_name then + module:log("debug", "Reading username..."); + packet.username = self:read_string(); + end + if packet.connect_flags.password then + module:log("debug", "Reading password..."); + packet.password = self:read_string(); + end + module:log("debug", "Done parsing connect!"); + length = 0; -- No payload left end elseif packet.type == "publish" then packet.topic = self:read_string(); @@ -87,6 +139,7 @@ if length > 0 then packet.data = self:read_bytes(length); end + module:log("debug", "MQTT packet complete!"); return packet; end @@ -102,7 +155,6 @@ end function stream_mt:feed(data) - module:log("debug", "Feeding %d bytes", #data); local packets = {}; local packet = self.parser(data); while packet do @@ -135,10 +187,10 @@ packet.data = string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic..packet.data; elseif packet.type == "suback" then local t = {}; - for _, topic in ipairs(packet.topics) do - table.insert(t, string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic.."\000"); + for i, result_code in ipairs(packet.results) do + table.insert(t, string.char(result_code)); end - packet.data = table.concat(t); + packet.data = packet.id..table.concat(t); end -- Get length