Software /
code /
prosody-modules
Comparison
mod_pubsub_mqtt/mqtt.lib.lua @ 1240:e0d97eb52ab8
mod_pubsub_mqtt: MQTT (a lightweight binary pubsub protocol) interface for mod_pubsub
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Sun, 01 Dec 2013 19:12:08 +0000 |
child | 1343:7dbde05b48a9 |
comparison
equal
deleted
inserted
replaced
1239:cc5cbeeb9fc7 | 1240:e0d97eb52ab8 |
---|---|
1 local bit = require "bit"; | |
2 | |
3 local stream_mt = {}; | |
4 stream_mt.__index = stream_mt; | |
5 | |
6 function stream_mt:read_bytes(n_bytes) | |
7 module:log("debug", "Reading %d bytes... (buffer: %d)", n_bytes, #self.buffer); | |
8 local data = self.buffer; | |
9 if not data then | |
10 module:log("debug", "No data, pausing."); | |
11 data = coroutine.yield(); | |
12 module:log("debug", "Have %d bytes of data now (want %d)", #data, n_bytes); | |
13 end | |
14 if #data >= n_bytes then | |
15 data, self.buffer = data:sub(1, n_bytes), data:sub(n_bytes+1); | |
16 elseif #data < n_bytes then | |
17 module:log("debug", "Not enough data (only %d bytes out of %d), pausing.", #data, n_bytes); | |
18 self.buffer = data..coroutine.yield(); | |
19 module:log("debug", "Now we have %d bytes, reading...", #data); | |
20 return self:read_bytes(n_bytes); | |
21 end | |
22 module:log("debug", "Returning %d bytes (buffer: %d)", #data, #self.buffer); | |
23 return data; | |
24 end | |
25 | |
26 function stream_mt:read_string() | |
27 local len1, len2 = self:read_bytes(2):byte(1,2); | |
28 local len = bit.lshift(len1, 8) + len2; | |
29 return self:read_bytes(len), len+2; | |
30 end | |
31 | |
32 local packet_type_codes = { | |
33 "connect", "connack", | |
34 "publish", "puback", "pubrec", "pubrel", "pubcomp", | |
35 "subscribe", "subak", "unsubscribe", "unsuback", | |
36 "pingreq", "pingresp", | |
37 "disconnect" | |
38 }; | |
39 | |
40 function stream_mt:read_packet() | |
41 local packet = {}; | |
42 local header = self:read_bytes(1):byte(); | |
43 packet.type = packet_type_codes[bit.rshift(bit.band(header, 0xf0), 4)]; | |
44 packet.dup = bit.band(header, 0x08) == 0x08; | |
45 packet.qos = bit.rshift(bit.band(header, 0x06), 1); | |
46 packet.retain = bit.band(header, 0x01) == 0x01; | |
47 | |
48 -- Get length | |
49 local length, multiplier = 0, 1; | |
50 repeat | |
51 local digit = self:read_bytes(1):byte(); | |
52 length = length + bit.band(digit, 0x7f)*multiplier; | |
53 multiplier = multiplier*128; | |
54 until bit.band(digit, 0x80) == 0; | |
55 packet.length = length; | |
56 if packet.type == "connect" then | |
57 if self:read_string() ~= "MQIsdp" then | |
58 module:log("warn", "Unexpected packet signature!"); | |
59 packet.type = nil; -- Invalid packet | |
60 else | |
61 packet.version = self:read_bytes(1):byte(); | |
62 packet.connect_flags = self:read_bytes(1):byte(); | |
63 packet.keepalive_timer = self:read_bytes(1):byte(); | |
64 length = length - 11; | |
65 end | |
66 elseif packet.type == "publish" then | |
67 packet.topic = self:read_string(); | |
68 length = length - (#packet.topic+2); | |
69 if packet.qos == 1 or packet.qos == 2 then | |
70 packet.id = self:read_bytes(2); | |
71 length = length - 2; | |
72 end | |
73 elseif packet.type == "subscribe" then | |
74 if packet.qos == 1 or packet.qos == 2 then | |
75 packet.id = self:read_bytes(2); | |
76 length = length - 2; | |
77 end | |
78 local topics = {}; | |
79 while length > 0 do | |
80 local topic, len = self:read_string(); | |
81 table.insert(topics, topic); | |
82 self:read_bytes(1); -- QoS not used | |
83 length = length - (len+1); | |
84 end | |
85 packet.topics = topics; | |
86 end | |
87 if length > 0 then | |
88 packet.data = self:read_bytes(length); | |
89 end | |
90 return packet; | |
91 end | |
92 | |
93 local function new_parser(self) | |
94 return coroutine.wrap(function (data) | |
95 self.buffer = data; | |
96 while true do | |
97 data = coroutine.yield(self:read_packet()); | |
98 module:log("debug", "Parser: %d new bytes", #data); | |
99 self.buffer = (self.buffer or "")..data; | |
100 end | |
101 end); | |
102 end | |
103 | |
104 function stream_mt:feed(data) | |
105 module:log("debug", "Feeding %d bytes", #data); | |
106 local packets = {}; | |
107 local packet = self.parser(data); | |
108 while packet do | |
109 module:log("debug", "Received packet"); | |
110 table.insert(packets, packet); | |
111 packet = self.parser(""); | |
112 end | |
113 module:log("debug", "Returning %d packets", #packets); | |
114 return packets; | |
115 end | |
116 | |
117 local function new_stream() | |
118 local stream = setmetatable({}, stream_mt); | |
119 stream.parser = new_parser(stream); | |
120 return stream; | |
121 end | |
122 | |
123 local function serialize_packet(packet) | |
124 local type_num = 0; | |
125 for i, v in ipairs(packet_type_codes) do -- FIXME: I'm so tired right now. | |
126 if v == packet.type then | |
127 type_num = i; | |
128 break; | |
129 end | |
130 end | |
131 local header = string.char(bit.lshift(type_num, 4)); | |
132 | |
133 if packet.type == "publish" then | |
134 local topic = packet.topic or ""; | |
135 packet.data = string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic..packet.data; | |
136 elseif packet.type == "suback" then | |
137 local t = {}; | |
138 for _, topic in ipairs(packet.topics) do | |
139 table.insert(t, string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic.."\000"); | |
140 end | |
141 packet.data = table.concat(t); | |
142 end | |
143 | |
144 -- Get length | |
145 local length = #(packet.data or ""); | |
146 repeat | |
147 local digit = length%128; | |
148 length = math.floor(length/128); | |
149 if length > 0 then | |
150 digit = bit.bor(digit, 0x80); | |
151 end | |
152 header = header..string.char(digit); -- FIXME: ... | |
153 until length <= 0; | |
154 | |
155 return header..(packet.data or ""); | |
156 end | |
157 | |
158 return { | |
159 new_stream = new_stream; | |
160 serialize_packet = serialize_packet; | |
161 }; |