Software /
code /
prosody-modules
Comparison
mod_pubsub_mqtt/mod_pubsub_mqtt.lua @ 1240:e0d97eb52ab8
mod_pubsub_mqtt: MQTT (a lightweight binary pubsub protocol) interface for mod_pubsub
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Sun, 01 Dec 2013 19:12:08 +0000 |
child | 1243:c2bf6b2102aa |
comparison
equal
deleted
inserted
replaced
1239:cc5cbeeb9fc7 | 1240:e0d97eb52ab8 |
---|---|
1 module:set_global(); | |
2 | |
3 local mqtt = module:require "mqtt"; | |
4 local st = require "util.stanza"; | |
5 | |
6 local pubsub_services = {}; | |
7 local pubsub_subscribers = {}; | |
8 local packet_handlers = {}; | |
9 | |
10 function handle_packet(session, packet) | |
11 module:log("warn", "MQTT packet received! Length: %d", packet.length); | |
12 for k,v in pairs(packet) do | |
13 module:log("debug", "MQTT %s: %s", tostring(k), tostring(v)); | |
14 end | |
15 local handler = packet_handlers[packet.type]; | |
16 if not handler then | |
17 module:log("warn", "Unhandled command: %s", tostring(packet.type)); | |
18 return; | |
19 end | |
20 handler(session, packet); | |
21 end | |
22 | |
23 function packet_handlers.connect(session, packet) | |
24 session.conn:write(mqtt.serialize_packet{ | |
25 type = "connack"; | |
26 data = string.char(0x00, 0x00); | |
27 }); | |
28 end | |
29 | |
30 function packet_handlers.disconnect(session, packet) | |
31 session.conn:close(); | |
32 end | |
33 | |
34 function packet_handlers.publish(session, packet) | |
35 module:log("warn", "PUBLISH to %s", packet.topic); | |
36 local host, node = packet.topic:match("^([^/]+)/(.+)$"); | |
37 local pubsub = pubsub_services[host]; | |
38 if not pubsub then | |
39 module:log("warn", "Unable to locate host/node: %s", packet.topic); | |
40 return; | |
41 end | |
42 local id = "mqtt"; | |
43 local ok, err = pubsub:publish(node, true, id, | |
44 st.stanza("data", { xmlns = "https://prosody.im/protocol/mqtt" }) | |
45 :text(packet.data) | |
46 ); | |
47 if not ok then | |
48 module:log("warn", "Error publishing MQTT data: %s", tostring(err)); | |
49 end | |
50 end | |
51 | |
52 function packet_handlers.subscribe(session, packet) | |
53 for _, topic in ipairs(packet.topics) do | |
54 module:log("warn", "SUBSCRIBE to %s", topic); | |
55 local host, node = topic:match("^([^/]+)/(.+)$"); | |
56 local pubsub = pubsub_subscribers[host]; | |
57 if not pubsub then | |
58 module:log("warn", "Unable to locate host/node: %s", topic); | |
59 return; | |
60 end | |
61 local node_subs = pubsub[node]; | |
62 if not node_subs then | |
63 node_subs = {}; | |
64 pubsub[node] = node_subs; | |
65 end | |
66 session.subscriptions[topic] = true; | |
67 node_subs[session] = true; | |
68 end | |
69 | |
70 end | |
71 | |
72 function packet_handlers.pingreq(session, packet) | |
73 session.conn:write(mqtt.serialize_packet{type = "pingresp"}); | |
74 end | |
75 | |
76 local sessions = {}; | |
77 | |
78 local mqtt_listener = {}; | |
79 | |
80 function mqtt_listener.onconnect(conn) | |
81 sessions[conn] = { | |
82 conn = conn; | |
83 stream = mqtt.new_stream(); | |
84 subscriptions = {}; | |
85 }; | |
86 end | |
87 | |
88 function mqtt_listener.onincoming(conn, data) | |
89 local session = sessions[conn]; | |
90 if session then | |
91 local packets = session.stream:feed(data); | |
92 for i = 1, #packets do | |
93 handle_packet(session, packets[i]); | |
94 end | |
95 end | |
96 end | |
97 | |
98 function mqtt_listener.ondisconnect(conn) | |
99 local session = sessions[conn]; | |
100 for topic in pairs(session.subscriptions) do | |
101 local host, node = topic:match("^([^/]+)/(.+)$"); | |
102 local subs = pubsub_subscribers[host]; | |
103 if subs then | |
104 local node_subs = subs[node]; | |
105 if node_subs then | |
106 node_subs[session] = nil; | |
107 end | |
108 end | |
109 end | |
110 sessions[conn] = nil; | |
111 module:log("debug", "MQTT client disconnected"); | |
112 end | |
113 | |
114 module:provides("net", { | |
115 default_port = 1883; | |
116 listener = mqtt_listener; | |
117 }); | |
118 | |
119 local function tostring_content(item) | |
120 return tostring(item[1]); | |
121 end | |
122 | |
123 local data_translators = setmetatable({ | |
124 ["data https://prosody.im/protocol/mqtt"] = tostring_content; | |
125 ["json urn:xmpp:json:0"] = tostring_content; | |
126 }, { | |
127 __index = function () return tostring; end; | |
128 }); | |
129 | |
130 function module.add_host(module) | |
131 local pubsub_module = hosts[module.host].modules.pubsub | |
132 if pubsub_module then | |
133 module:log("debug", "MQTT enabled for %s", module.host); | |
134 module:depends("pubsub"); | |
135 pubsub_services[module.host] = assert(pubsub_module.service); | |
136 local subscribers = {}; | |
137 pubsub_subscribers[module.host] = subscribers; | |
138 local function handle_publish(event) | |
139 -- Build MQTT packet | |
140 local packet = mqtt.serialize_packet{ | |
141 type = "publish"; | |
142 id = "\000\000"; | |
143 topic = module.host.."/"..event.node; | |
144 data = data_translators[event.item.name.." "..event.item.attr.xmlns](event.item); | |
145 }; | |
146 -- Broadcast to subscribers | |
147 module:log("debug", "Broadcasting PUBLISH to subscribers of %s/%s", module.host, event.node); | |
148 for session in pairs(subscribers[event.node] or {}) do | |
149 session.conn:write(packet); | |
150 module:log("debug", "Sent to %s", tostring(session)); | |
151 end | |
152 end | |
153 pubsub_services[module.host].events.add_handler("item-published", handle_publish); | |
154 function module.unload() | |
155 module:log("debug", "MQTT disabled for %s", module.host); | |
156 pubsub_module.service.remove_handler("item-published", handle_publish); | |
157 pubsub_services[module.host] = nil; | |
158 pubsub_subscribers[module.host] = nil; | |
159 end | |
160 end | |
161 end |