Comparison

mod_pubsub_mqtt/mqtt.lib.lua @ 1240:e0d97eb52ab8

mod_pubsub_mqtt: MQTT (a lightweight binary pubsub protocol) interface for mod_pubsub
author Matthew Wild <mwild1@gmail.com>
date Sun, 01 Dec 2013 19:12:08 +0000
child 1343:7dbde05b48a9
comparison
equal deleted inserted replaced
1239:cc5cbeeb9fc7 1240:e0d97eb52ab8
1 local bit = require "bit";
2
3 local stream_mt = {};
4 stream_mt.__index = stream_mt;
5
6 function stream_mt:read_bytes(n_bytes)
7 module:log("debug", "Reading %d bytes... (buffer: %d)", n_bytes, #self.buffer);
8 local data = self.buffer;
9 if not data then
10 module:log("debug", "No data, pausing.");
11 data = coroutine.yield();
12 module:log("debug", "Have %d bytes of data now (want %d)", #data, n_bytes);
13 end
14 if #data >= n_bytes then
15 data, self.buffer = data:sub(1, n_bytes), data:sub(n_bytes+1);
16 elseif #data < n_bytes then
17 module:log("debug", "Not enough data (only %d bytes out of %d), pausing.", #data, n_bytes);
18 self.buffer = data..coroutine.yield();
19 module:log("debug", "Now we have %d bytes, reading...", #data);
20 return self:read_bytes(n_bytes);
21 end
22 module:log("debug", "Returning %d bytes (buffer: %d)", #data, #self.buffer);
23 return data;
24 end
25
26 function stream_mt:read_string()
27 local len1, len2 = self:read_bytes(2):byte(1,2);
28 local len = bit.lshift(len1, 8) + len2;
29 return self:read_bytes(len), len+2;
30 end
31
32 local packet_type_codes = {
33 "connect", "connack",
34 "publish", "puback", "pubrec", "pubrel", "pubcomp",
35 "subscribe", "subak", "unsubscribe", "unsuback",
36 "pingreq", "pingresp",
37 "disconnect"
38 };
39
40 function stream_mt:read_packet()
41 local packet = {};
42 local header = self:read_bytes(1):byte();
43 packet.type = packet_type_codes[bit.rshift(bit.band(header, 0xf0), 4)];
44 packet.dup = bit.band(header, 0x08) == 0x08;
45 packet.qos = bit.rshift(bit.band(header, 0x06), 1);
46 packet.retain = bit.band(header, 0x01) == 0x01;
47
48 -- Get length
49 local length, multiplier = 0, 1;
50 repeat
51 local digit = self:read_bytes(1):byte();
52 length = length + bit.band(digit, 0x7f)*multiplier;
53 multiplier = multiplier*128;
54 until bit.band(digit, 0x80) == 0;
55 packet.length = length;
56 if packet.type == "connect" then
57 if self:read_string() ~= "MQIsdp" then
58 module:log("warn", "Unexpected packet signature!");
59 packet.type = nil; -- Invalid packet
60 else
61 packet.version = self:read_bytes(1):byte();
62 packet.connect_flags = self:read_bytes(1):byte();
63 packet.keepalive_timer = self:read_bytes(1):byte();
64 length = length - 11;
65 end
66 elseif packet.type == "publish" then
67 packet.topic = self:read_string();
68 length = length - (#packet.topic+2);
69 if packet.qos == 1 or packet.qos == 2 then
70 packet.id = self:read_bytes(2);
71 length = length - 2;
72 end
73 elseif packet.type == "subscribe" then
74 if packet.qos == 1 or packet.qos == 2 then
75 packet.id = self:read_bytes(2);
76 length = length - 2;
77 end
78 local topics = {};
79 while length > 0 do
80 local topic, len = self:read_string();
81 table.insert(topics, topic);
82 self:read_bytes(1); -- QoS not used
83 length = length - (len+1);
84 end
85 packet.topics = topics;
86 end
87 if length > 0 then
88 packet.data = self:read_bytes(length);
89 end
90 return packet;
91 end
92
93 local function new_parser(self)
94 return coroutine.wrap(function (data)
95 self.buffer = data;
96 while true do
97 data = coroutine.yield(self:read_packet());
98 module:log("debug", "Parser: %d new bytes", #data);
99 self.buffer = (self.buffer or "")..data;
100 end
101 end);
102 end
103
104 function stream_mt:feed(data)
105 module:log("debug", "Feeding %d bytes", #data);
106 local packets = {};
107 local packet = self.parser(data);
108 while packet do
109 module:log("debug", "Received packet");
110 table.insert(packets, packet);
111 packet = self.parser("");
112 end
113 module:log("debug", "Returning %d packets", #packets);
114 return packets;
115 end
116
117 local function new_stream()
118 local stream = setmetatable({}, stream_mt);
119 stream.parser = new_parser(stream);
120 return stream;
121 end
122
123 local function serialize_packet(packet)
124 local type_num = 0;
125 for i, v in ipairs(packet_type_codes) do -- FIXME: I'm so tired right now.
126 if v == packet.type then
127 type_num = i;
128 break;
129 end
130 end
131 local header = string.char(bit.lshift(type_num, 4));
132
133 if packet.type == "publish" then
134 local topic = packet.topic or "";
135 packet.data = string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic..packet.data;
136 elseif packet.type == "suback" then
137 local t = {};
138 for _, topic in ipairs(packet.topics) do
139 table.insert(t, string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic.."\000");
140 end
141 packet.data = table.concat(t);
142 end
143
144 -- Get length
145 local length = #(packet.data or "");
146 repeat
147 local digit = length%128;
148 length = math.floor(length/128);
149 if length > 0 then
150 digit = bit.bor(digit, 0x80);
151 end
152 header = header..string.char(digit); -- FIXME: ...
153 until length <= 0;
154
155 return header..(packet.data or "");
156 end
157
158 return {
159 new_stream = new_stream;
160 serialize_packet = serialize_packet;
161 };