Software /
code /
prosody-modules
Comparison
mod_pubsub_mqtt/mod_pubsub_mqtt.lua @ 5113:85a7304cfea1
mod_pubsub_mqtt: Support atom_title payload type
This commit adds the ability to publish and subscribe with arbitrary payload
types. It has a breaking change, which is that topics are now of the form:
HOST/TYPE/NODE
Currently supported types are utf8_data, json and atom_title.
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Fri, 16 Dec 2022 22:16:45 +0000 |
parent | 5112:9499b88f3453 |
child | 5854:801f64e6d4e9 |
comparison
equal
deleted
inserted
replaced
5112:9499b88f3453 | 5113:85a7304cfea1 |
---|---|
1 module:set_global(); | 1 module:set_global(); |
2 | 2 |
3 local mqtt = module:require "mqtt"; | 3 local mqtt = module:require "mqtt"; |
4 local id = require "util.id"; | |
4 local st = require "util.stanza"; | 5 local st = require "util.stanza"; |
6 | |
7 local function tostring_content(item) | |
8 return tostring(item[1]); | |
9 end | |
10 | |
11 local data_translators = setmetatable({ | |
12 utf8 = { | |
13 from_item = function (item) | |
14 return item:find("{https://prosody.im/protocol/data}data#"); | |
15 end; | |
16 to_item = function (payload) | |
17 return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() }) | |
18 :text_tag("data", payload, { xmlns = "https://prosody.im/protocol/data" }) | |
19 end; | |
20 }; | |
21 json = { | |
22 from_item = function (item) | |
23 return item:find("{urn:xmpp:json:0}json#"); | |
24 end; | |
25 to_item = function (payload) | |
26 return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() }) | |
27 :text_tag("json", payload, { xmlns = "urn:xmpp:json:0" }); | |
28 end; | |
29 }; | |
30 atom_title = { | |
31 from_item = function (item) | |
32 return item:find("{http://www.w3.org/2005/Atom}entry/title#"); | |
33 end; | |
34 to_item = function (payload) | |
35 return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() }) | |
36 :tag("entry", { xmlns = "http://www.w3.org/2005/Atom" }) | |
37 :text_tag("title", payload, { type = "text" }); | |
38 end; | |
39 }; | |
40 }, { | |
41 __index = function () return { from_item = tostring }; end; | |
42 }); | |
5 | 43 |
6 local pubsub_services = {}; | 44 local pubsub_services = {}; |
7 local pubsub_subscribers = {}; | 45 local pubsub_subscribers = {}; |
8 local packet_handlers = {}; | 46 local packet_handlers = {}; |
9 | 47 |
31 session.conn:close(); | 69 session.conn:close(); |
32 end | 70 end |
33 | 71 |
34 function packet_handlers.publish(session, packet) | 72 function packet_handlers.publish(session, packet) |
35 module:log("info", "PUBLISH to %s", packet.topic); | 73 module:log("info", "PUBLISH to %s", packet.topic); |
36 local host, node = packet.topic:match("^([^/]+)/(.+)$"); | 74 local host, payload_type, node = packet.topic:match("^([^/]+)/([^/]+)/(.+)$"); |
75 if not host then | |
76 module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE"); | |
77 return; | |
78 end | |
37 local pubsub = pubsub_services[host]; | 79 local pubsub = pubsub_services[host]; |
38 if not pubsub then | 80 if not pubsub then |
39 module:log("warn", "Unable to locate host/node: %s", packet.topic); | 81 module:log("warn", "Unable to locate host/node: %s", packet.topic); |
40 return; | 82 return; |
41 end | 83 end |
42 local id = "mqtt"; | 84 |
43 local ok, err = pubsub:publish(node, true, id, | 85 local payload_translator = data_translators[payload_type]; |
44 st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id }) | 86 if not payload_translator or not payload_translator.to_item then |
45 :text_tag("data", packet.data, { xmlns = "https://prosody.im/protocol/data" }) | 87 module:log("warn", "Unsupported payload type '%s' on topic '%s'", payload_type, packet.topic); |
46 ); | 88 return; |
89 end | |
90 | |
91 local payload_item = payload_translator.to_item(packet.data); | |
92 local ok, err = pubsub:publish(node, true, payload_item.attr.id, payload_item); | |
47 if not ok then | 93 if not ok then |
48 module:log("warn", "Error publishing MQTT data: %s", tostring(err)); | 94 module:log("warn", "Error publishing MQTT data: %s", tostring(err)); |
49 end | 95 end |
50 end | 96 end |
51 | 97 |
52 function packet_handlers.subscribe(session, packet) | 98 function packet_handlers.subscribe(session, packet) |
53 for _, topic in ipairs(packet.topics) do | 99 for _, topic in ipairs(packet.topics) do |
54 module:log("info", "SUBSCRIBE to %s", topic); | 100 module:log("info", "SUBSCRIBE to %s", topic); |
55 local host, node = topic:match("^([^/]+)/(.+)$"); | 101 local host, payload_type, node = topic:match("^([^/]+)/([^/]+)/(.+)$"); |
102 if not host then | |
103 module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE"); | |
104 return; | |
105 end | |
56 local pubsub = pubsub_subscribers[host]; | 106 local pubsub = pubsub_subscribers[host]; |
57 if not pubsub then | 107 if not pubsub then |
58 module:log("warn", "Unable to locate host/node: %s", topic); | 108 module:log("warn", "Unable to locate host/node: %s", topic); |
59 return; | 109 return; |
60 end | 110 end |
61 local node_subs = pubsub[node]; | 111 local node_subs = pubsub[node]; |
62 if not node_subs then | 112 if not node_subs then |
63 node_subs = {}; | 113 node_subs = {}; |
64 pubsub[node] = node_subs; | 114 pubsub[node] = node_subs; |
65 end | 115 end |
66 session.subscriptions[topic] = true; | 116 session.subscriptions[topic] = payload_type; |
67 node_subs[session] = true; | 117 node_subs[session] = payload_type; |
68 end | 118 end |
69 | 119 |
70 end | 120 end |
71 | 121 |
72 function packet_handlers.pingreq(session, packet) | 122 function packet_handlers.pingreq(session, packet) |
114 module:provides("net", { | 164 module:provides("net", { |
115 default_port = 1883; | 165 default_port = 1883; |
116 listener = mqtt_listener; | 166 listener = mqtt_listener; |
117 }); | 167 }); |
118 | 168 |
119 local function tostring_content(item) | |
120 return tostring(item[1]); | |
121 end | |
122 | |
123 local data_translators = setmetatable({ | |
124 ["data https://prosody.im/protocol/data"] = tostring_content; | |
125 ["json urn:xmpp:json:0"] = tostring_content; | |
126 }, { | |
127 __index = function () return tostring; end; | |
128 }); | |
129 | |
130 function module.add_host(module) | 169 function module.add_host(module) |
131 local pubsub_module = hosts[module.host].modules.pubsub | 170 local pubsub_module = hosts[module.host].modules.pubsub |
132 if pubsub_module then | 171 if pubsub_module then |
133 module:log("debug", "MQTT enabled for %s", module.host); | 172 module:log("debug", "MQTT enabled for %s", module.host); |
134 module:depends("pubsub"); | 173 module:depends("pubsub"); |
135 pubsub_services[module.host] = assert(pubsub_module.service); | 174 pubsub_services[module.host] = assert(pubsub_module.service); |
136 local subscribers = {}; | 175 local subscribers = {}; |
137 pubsub_subscribers[module.host] = subscribers; | 176 pubsub_subscribers[module.host] = subscribers; |
138 local function handle_publish(event) | 177 local function handle_publish(event) |
139 -- Build MQTT packet | 178 -- Build MQTT packet |
140 local packet = mqtt.serialize_packet{ | 179 local packet_types = setmetatable({}, { |
141 type = "publish"; | 180 __index = function (self, payload_type) |
142 id = "\000\000"; | 181 local packet = mqtt.serialize_packet{ |
143 topic = module.host.."/"..event.node; | 182 type = "publish"; |
144 data = data_translators[tostring(event.item.name).." "..tostring(event.item.attr.xmlns)](event.item); | 183 id = "\000\000"; |
145 }; | 184 topic = module.host.."/"..payload_type.."/"..event.node; |
185 data = data_translators[payload_type].from_item(event.item) or ""; | |
186 }; | |
187 rawset(self, packet); | |
188 return packet; | |
189 end; | |
190 }); | |
146 -- Broadcast to subscribers | 191 -- Broadcast to subscribers |
147 module:log("debug", "Broadcasting PUBLISH to subscribers of %s/%s", module.host, event.node); | 192 module:log("debug", "Broadcasting PUBLISH to subscribers of %s/*/%s", module.host, event.node); |
148 for session in pairs(subscribers[event.node] or {}) do | 193 for session, payload_type in pairs(subscribers[event.node] or {}) do |
149 session.conn:write(packet); | 194 session.conn:write(packet_types[payload_type]); |
150 module:log("debug", "Sent to %s", tostring(session)); | 195 module:log("debug", "Sent to %s", tostring(session)); |
151 end | 196 end |
152 end | 197 end |
153 pubsub_services[module.host].events.add_handler("item-published", handle_publish); | 198 pubsub_services[module.host].events.add_handler("item-published", handle_publish); |
154 function module.unload() | 199 function module.unload() |