Software /
code /
verse
Changeset
93:2442e751f3cb
verse.bosh: Implemented retry/reconnect logic, and handling of disconnects (either CM-intiated or due to connection failures)
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Sun, 08 Aug 2010 01:21:22 +0100 |
parents | 92:dcccef33f0eb |
children | 94:b40465267fb5 |
files | bosh.lua |
diffstat | 1 files changed, 77 insertions(+), 20 deletions(-) [+] |
line wrap: on
line diff
--- a/bosh.lua Sun Aug 08 01:18:12 2010 +0100 +++ b/bosh.lua Sun Aug 08 01:21:22 2010 +0100 @@ -10,6 +10,8 @@ local xmlns_stream = "http://etherx.jabber.org/streams"; local xmlns_bosh = "http://jabber.org/protocol/httpbind"; +local reconnect_timeout = 5; + function verse.new_bosh(logger, url) local stream = { bosh_conn_pool = {}; @@ -21,10 +23,7 @@ }; function stream:reopen() self.bosh_need_restart = true; - self:flush(true); - end - function stream.bosh_response_handler(response, code, request) - return stream:_handle_response(response, code, request); + self:flush(); end local conn = verse.new(logger, stream); return setmetatable(conn, stream_mt); @@ -40,10 +39,12 @@ self:flush(); --TODO: Optimize by doing this on next tick (give a chance for data to buffer) end -function stream_mt:flush(force) +function stream_mt:flush() if self.connected and #self.bosh_waiting_requests < self.bosh_max_requests - and (force or #self.bosh_outgoing_buffer > 0) then + and (#self.bosh_waiting_requests == 0 + or #self.bosh_outgoing_buffer > 0 + or self.bosh_need_restart) then self:debug("Flushing..."); local payload = self:_make_body(); local buffer = self.bosh_outgoing_buffer; @@ -51,13 +52,55 @@ payload:add_child(stanza); buffer[i] = nil; end - local request = http.request(self.bosh_url, { body = tostring(payload) }, self.bosh_response_handler); - table.insert(self.bosh_waiting_requests, request); + self:_make_request(payload); else self:debug("Decided not to flush."); end end +function stream_mt:_make_request(payload) + local request, err = http.request(self.bosh_url, { body = tostring(payload) }, function (response, code, request) + if code ~= 0 then + self.inactive_since = nil; + return self:_handle_response(response, code, request); + end + + -- Connection issues, we need to retry this request + local time = os.time(); + if not self.inactive_since then + self.inactive_since = time; -- So we know when it is time to give up + elseif time - self.inactive_since > self.bosh_max_inactivity then + return self:_disconnected(); + else + self:debug("%d seconds left to reconnect, retrying in %d seconds...", + self.bosh_max_inactivity - (time - self.inactive_since), reconnect_timeout); + end + + -- Set up reconnect timer + timer.add_task(reconnect_timeout, function () + self:debug("Retrying request..."); + -- Remove old request + for i, waiting_request in ipairs(self.bosh_waiting_requests) do + if waiting_request == request then + table.remove(self.bosh_waiting_requests, i); + break; + end + end + self:_make_request(payload); + end); + end); + if request then + table.insert(self.bosh_waiting_requests, request); + else + self:warn("Request failed instantly: %s", err); + end +end + +function stream_mt:_disconnected() + self.connected = nil; + self:event("disconnected"); +end + function stream_mt:_send_session_request() local body = self:_make_body(); @@ -72,18 +115,23 @@ body.attr.to = self.host; body.attr.secure = 'true'; - http.request(self.bosh_url, { body = tostring(body) }, function (response) + http.request(self.bosh_url, { body = tostring(body) }, function (response, code) + if code == 0 then + -- Failed to connect + return self:_disconnected(); + end -- Handle session creation response local payload = self:_parse_response(response) if not payload then self:warn("Invalid session creation response"); - self:event("disconnected"); + self:_disconnected(); return; end - self.bosh_sid = payload.attr.sid; - self.bosh_wait = tonumber(payload.attr.wait); - self.bosh_hold = tonumber(payload.attr.hold); - self.bosh_max_requests = tonumber(payload.attr.requests) or self.bosh_hold; + self.bosh_sid = payload.attr.sid; -- Session id + self.bosh_wait = tonumber(payload.attr.wait); -- How long the server may hold connections for + self.bosh_hold = tonumber(payload.attr.hold); -- How many connections the server may hold + self.bosh_max_inactivity = tonumber(payload.attr.inactivity); -- Max amount of time with no connections + self.bosh_max_requests = tonumber(payload.attr.requests) or self.bosh_hold; -- Max simultaneous requests we can make self.connected = true; self:event("connected"); self:_handle_response_payload(payload); @@ -93,6 +141,12 @@ function stream_mt:_handle_response(response, code, request) if self.bosh_waiting_requests[1] ~= request then self:warn("Server replied to request that wasn't the oldest"); + for i, waiting_request in ipairs(self.bosh_waiting_requests) do + if waiting_request == request then + self.bosh_waiting_requests[i] = nil; + break; + end + end else table.remove(self.bosh_waiting_requests, 1); end @@ -100,12 +154,7 @@ if payload then self:_handle_response_payload(payload); end - if #self.bosh_waiting_requests == 0 then - self:debug("We have no requests open, so forcing flush..."); - self:flush(true); - else - self:debug("We have %d requests open, so no need to force a flush", #self.bosh_waiting_requests); - end + self:flush(); end function stream_mt:_handle_response_payload(payload) @@ -118,6 +167,9 @@ self:event("stanza", stanza); end end + if payload.attr.type == "terminate" then + self:_disconnected({reason = payload.attr.condition}); + end end local stream_callbacks = { @@ -128,6 +180,11 @@ }; function stream_mt:_parse_response(response) self:debug("Parsing response: %s", response); + if response == nil then + self:debug("%s", debug.traceback()); + self:_disconnected(); + return; + end local session = { notopen = true, log = self.log }; local parser = lxp.new(init_xmlhandlers(session, stream_callbacks), "\1"); parser:parse(response);