Software /
code /
verse
Comparison
bosh.lua @ 93:2442e751f3cb
verse.bosh: Implemented retry/reconnect logic, and handling of disconnects (either CM-intiated or due to connection failures)
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Sun, 08 Aug 2010 01:21:22 +0100 |
parent | 89:1752a9097e6b |
child | 161:b177bcea2006 |
comparison
equal
deleted
inserted
replaced
92:dcccef33f0eb | 93:2442e751f3cb |
---|---|
7 local stream_mt = setmetatable({}, { __index = verse.stream_mt }); | 7 local stream_mt = setmetatable({}, { __index = verse.stream_mt }); |
8 stream_mt.__index = stream_mt; | 8 stream_mt.__index = stream_mt; |
9 | 9 |
10 local xmlns_stream = "http://etherx.jabber.org/streams"; | 10 local xmlns_stream = "http://etherx.jabber.org/streams"; |
11 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; | 11 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; |
12 | |
13 local reconnect_timeout = 5; | |
12 | 14 |
13 function verse.new_bosh(logger, url) | 15 function verse.new_bosh(logger, url) |
14 local stream = { | 16 local stream = { |
15 bosh_conn_pool = {}; | 17 bosh_conn_pool = {}; |
16 bosh_waiting_requests = {}; | 18 bosh_waiting_requests = {}; |
19 bosh_url = url; | 21 bosh_url = url; |
20 conn = {}; | 22 conn = {}; |
21 }; | 23 }; |
22 function stream:reopen() | 24 function stream:reopen() |
23 self.bosh_need_restart = true; | 25 self.bosh_need_restart = true; |
24 self:flush(true); | 26 self:flush(); |
25 end | |
26 function stream.bosh_response_handler(response, code, request) | |
27 return stream:_handle_response(response, code, request); | |
28 end | 27 end |
29 local conn = verse.new(logger, stream); | 28 local conn = verse.new(logger, stream); |
30 return setmetatable(conn, stream_mt); | 29 return setmetatable(conn, stream_mt); |
31 end | 30 end |
32 | 31 |
38 self:debug("Putting into BOSH send buffer: %s", tostring(data)); | 37 self:debug("Putting into BOSH send buffer: %s", tostring(data)); |
39 self.bosh_outgoing_buffer[#self.bosh_outgoing_buffer+1] = st.clone(data); | 38 self.bosh_outgoing_buffer[#self.bosh_outgoing_buffer+1] = st.clone(data); |
40 self:flush(); --TODO: Optimize by doing this on next tick (give a chance for data to buffer) | 39 self:flush(); --TODO: Optimize by doing this on next tick (give a chance for data to buffer) |
41 end | 40 end |
42 | 41 |
43 function stream_mt:flush(force) | 42 function stream_mt:flush() |
44 if self.connected | 43 if self.connected |
45 and #self.bosh_waiting_requests < self.bosh_max_requests | 44 and #self.bosh_waiting_requests < self.bosh_max_requests |
46 and (force or #self.bosh_outgoing_buffer > 0) then | 45 and (#self.bosh_waiting_requests == 0 |
46 or #self.bosh_outgoing_buffer > 0 | |
47 or self.bosh_need_restart) then | |
47 self:debug("Flushing..."); | 48 self:debug("Flushing..."); |
48 local payload = self:_make_body(); | 49 local payload = self:_make_body(); |
49 local buffer = self.bosh_outgoing_buffer; | 50 local buffer = self.bosh_outgoing_buffer; |
50 for i, stanza in ipairs(buffer) do | 51 for i, stanza in ipairs(buffer) do |
51 payload:add_child(stanza); | 52 payload:add_child(stanza); |
52 buffer[i] = nil; | 53 buffer[i] = nil; |
53 end | 54 end |
54 local request = http.request(self.bosh_url, { body = tostring(payload) }, self.bosh_response_handler); | 55 self:_make_request(payload); |
56 else | |
57 self:debug("Decided not to flush."); | |
58 end | |
59 end | |
60 | |
61 function stream_mt:_make_request(payload) | |
62 local request, err = http.request(self.bosh_url, { body = tostring(payload) }, function (response, code, request) | |
63 if code ~= 0 then | |
64 self.inactive_since = nil; | |
65 return self:_handle_response(response, code, request); | |
66 end | |
67 | |
68 -- Connection issues, we need to retry this request | |
69 local time = os.time(); | |
70 if not self.inactive_since then | |
71 self.inactive_since = time; -- So we know when it is time to give up | |
72 elseif time - self.inactive_since > self.bosh_max_inactivity then | |
73 return self:_disconnected(); | |
74 else | |
75 self:debug("%d seconds left to reconnect, retrying in %d seconds...", | |
76 self.bosh_max_inactivity - (time - self.inactive_since), reconnect_timeout); | |
77 end | |
78 | |
79 -- Set up reconnect timer | |
80 timer.add_task(reconnect_timeout, function () | |
81 self:debug("Retrying request..."); | |
82 -- Remove old request | |
83 for i, waiting_request in ipairs(self.bosh_waiting_requests) do | |
84 if waiting_request == request then | |
85 table.remove(self.bosh_waiting_requests, i); | |
86 break; | |
87 end | |
88 end | |
89 self:_make_request(payload); | |
90 end); | |
91 end); | |
92 if request then | |
55 table.insert(self.bosh_waiting_requests, request); | 93 table.insert(self.bosh_waiting_requests, request); |
56 else | 94 else |
57 self:debug("Decided not to flush."); | 95 self:warn("Request failed instantly: %s", err); |
58 end | 96 end |
97 end | |
98 | |
99 function stream_mt:_disconnected() | |
100 self.connected = nil; | |
101 self:event("disconnected"); | |
59 end | 102 end |
60 | 103 |
61 function stream_mt:_send_session_request() | 104 function stream_mt:_send_session_request() |
62 local body = self:_make_body(); | 105 local body = self:_make_body(); |
63 | 106 |
70 -- XEP-0206 | 113 -- XEP-0206 |
71 body.attr.from = self.jid; | 114 body.attr.from = self.jid; |
72 body.attr.to = self.host; | 115 body.attr.to = self.host; |
73 body.attr.secure = 'true'; | 116 body.attr.secure = 'true'; |
74 | 117 |
75 http.request(self.bosh_url, { body = tostring(body) }, function (response) | 118 http.request(self.bosh_url, { body = tostring(body) }, function (response, code) |
119 if code == 0 then | |
120 -- Failed to connect | |
121 return self:_disconnected(); | |
122 end | |
76 -- Handle session creation response | 123 -- Handle session creation response |
77 local payload = self:_parse_response(response) | 124 local payload = self:_parse_response(response) |
78 if not payload then | 125 if not payload then |
79 self:warn("Invalid session creation response"); | 126 self:warn("Invalid session creation response"); |
80 self:event("disconnected"); | 127 self:_disconnected(); |
81 return; | 128 return; |
82 end | 129 end |
83 self.bosh_sid = payload.attr.sid; | 130 self.bosh_sid = payload.attr.sid; -- Session id |
84 self.bosh_wait = tonumber(payload.attr.wait); | 131 self.bosh_wait = tonumber(payload.attr.wait); -- How long the server may hold connections for |
85 self.bosh_hold = tonumber(payload.attr.hold); | 132 self.bosh_hold = tonumber(payload.attr.hold); -- How many connections the server may hold |
86 self.bosh_max_requests = tonumber(payload.attr.requests) or self.bosh_hold; | 133 self.bosh_max_inactivity = tonumber(payload.attr.inactivity); -- Max amount of time with no connections |
134 self.bosh_max_requests = tonumber(payload.attr.requests) or self.bosh_hold; -- Max simultaneous requests we can make | |
87 self.connected = true; | 135 self.connected = true; |
88 self:event("connected"); | 136 self:event("connected"); |
89 self:_handle_response_payload(payload); | 137 self:_handle_response_payload(payload); |
90 end); | 138 end); |
91 end | 139 end |
92 | 140 |
93 function stream_mt:_handle_response(response, code, request) | 141 function stream_mt:_handle_response(response, code, request) |
94 if self.bosh_waiting_requests[1] ~= request then | 142 if self.bosh_waiting_requests[1] ~= request then |
95 self:warn("Server replied to request that wasn't the oldest"); | 143 self:warn("Server replied to request that wasn't the oldest"); |
144 for i, waiting_request in ipairs(self.bosh_waiting_requests) do | |
145 if waiting_request == request then | |
146 self.bosh_waiting_requests[i] = nil; | |
147 break; | |
148 end | |
149 end | |
96 else | 150 else |
97 table.remove(self.bosh_waiting_requests, 1); | 151 table.remove(self.bosh_waiting_requests, 1); |
98 end | 152 end |
99 local payload = self:_parse_response(response); | 153 local payload = self:_parse_response(response); |
100 if payload then | 154 if payload then |
101 self:_handle_response_payload(payload); | 155 self:_handle_response_payload(payload); |
102 end | 156 end |
103 if #self.bosh_waiting_requests == 0 then | 157 self:flush(); |
104 self:debug("We have no requests open, so forcing flush..."); | |
105 self:flush(true); | |
106 else | |
107 self:debug("We have %d requests open, so no need to force a flush", #self.bosh_waiting_requests); | |
108 end | |
109 end | 158 end |
110 | 159 |
111 function stream_mt:_handle_response_payload(payload) | 160 function stream_mt:_handle_response_payload(payload) |
112 for stanza in payload:childtags() do | 161 for stanza in payload:childtags() do |
113 if stanza.attr.xmlns == xmlns_stream then | 162 if stanza.attr.xmlns == xmlns_stream then |
116 self:event("stream/"..stanza.attr.xmlns, stanza); | 165 self:event("stream/"..stanza.attr.xmlns, stanza); |
117 else | 166 else |
118 self:event("stanza", stanza); | 167 self:event("stanza", stanza); |
119 end | 168 end |
120 end | 169 end |
170 if payload.attr.type == "terminate" then | |
171 self:_disconnected({reason = payload.attr.condition}); | |
172 end | |
121 end | 173 end |
122 | 174 |
123 local stream_callbacks = { | 175 local stream_callbacks = { |
124 stream_ns = "http://jabber.org/protocol/httpbind", stream_tag = "body", | 176 stream_ns = "http://jabber.org/protocol/httpbind", stream_tag = "body", |
125 default_ns = "jabber:client", | 177 default_ns = "jabber:client", |
126 streamopened = function (session, attr) session.notopen = nil; session.payload = verse.stanza("body", attr); return true; end; | 178 streamopened = function (session, attr) session.notopen = nil; session.payload = verse.stanza("body", attr); return true; end; |
127 handlestanza = function (session, stanza) session.payload:add_child(stanza); end; | 179 handlestanza = function (session, stanza) session.payload:add_child(stanza); end; |
128 }; | 180 }; |
129 function stream_mt:_parse_response(response) | 181 function stream_mt:_parse_response(response) |
130 self:debug("Parsing response: %s", response); | 182 self:debug("Parsing response: %s", response); |
183 if response == nil then | |
184 self:debug("%s", debug.traceback()); | |
185 self:_disconnected(); | |
186 return; | |
187 end | |
131 local session = { notopen = true, log = self.log }; | 188 local session = { notopen = true, log = self.log }; |
132 local parser = lxp.new(init_xmlhandlers(session, stream_callbacks), "\1"); | 189 local parser = lxp.new(init_xmlhandlers(session, stream_callbacks), "\1"); |
133 parser:parse(response); | 190 parser:parse(response); |
134 return session.payload; | 191 return session.payload; |
135 end | 192 end |