Software /
code /
verse
Comparison
bosh.lua @ 87:d59073722924
verse.bosh: Use verse.new_bosh(logger, url) to make a BOSH connection
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Fri, 06 Aug 2010 16:28:50 +0100 |
child | 89:1752a9097e6b |
comparison
equal
deleted
inserted
replaced
86:508f653e9d46 | 87:d59073722924 |
---|---|
1 | |
2 local init_xmlhandlers = require "core.xmlhandlers"; | |
3 local st = require "util.stanza"; | |
4 | |
5 local stream_mt = setmetatable({}, { __index = verse.stream_mt }); | |
6 stream_mt.__index = stream_mt; | |
7 | |
8 local xmlns_stream = "http://etherx.jabber.org/streams"; | |
9 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; | |
10 | |
11 function verse.new_bosh(logger, url) | |
12 local stream = { | |
13 bosh_conn_pool = {}; | |
14 bosh_waiting_requests = {}; | |
15 bosh_rid = math.random(1,999999); | |
16 bosh_outgoing_buffer = {}; | |
17 bosh_url = url; | |
18 conn = {}; | |
19 }; | |
20 function stream:reopen() | |
21 self.bosh_need_restart = true; | |
22 self:flush(true); | |
23 end | |
24 function stream.bosh_response_handler(response, code, request) | |
25 return stream:_handle_response(response, code, request); | |
26 end | |
27 local conn = verse.new(logger, stream); | |
28 conn:add_plugin("http"); | |
29 return setmetatable(conn, stream_mt); | |
30 end | |
31 | |
32 function stream_mt:connect() | |
33 self:_send_session_request(); | |
34 end | |
35 | |
36 function stream_mt:send(data) | |
37 self:debug("Putting into BOSH send buffer: %s", tostring(data)); | |
38 self.bosh_outgoing_buffer[#self.bosh_outgoing_buffer+1] = st.clone(data); | |
39 self:flush(); --TODO: Optimize by doing this on next tick (give a chance for data to buffer) | |
40 end | |
41 | |
42 function stream_mt:flush(force) | |
43 if self.connected | |
44 and #self.bosh_waiting_requests < self.bosh_max_requests | |
45 and (force or #self.bosh_outgoing_buffer > 0) then | |
46 self:debug("Flushing..."); | |
47 local payload = self:_make_body(); | |
48 local buffer = self.bosh_outgoing_buffer; | |
49 for i, stanza in ipairs(buffer) do | |
50 payload:add_child(stanza); | |
51 buffer[i] = nil; | |
52 end | |
53 local request = self.http.request(self.bosh_url, { body = tostring(payload) }, self.bosh_response_handler); | |
54 table.insert(self.bosh_waiting_requests, request); | |
55 else | |
56 self:debug("Decided not to flush."); | |
57 end | |
58 end | |
59 | |
60 function stream_mt:_send_session_request() | |
61 local body = self:_make_body(); | |
62 | |
63 -- XEP-0124 | |
64 body.attr.hold = "1"; | |
65 body.attr.wait = "60"; | |
66 body.attr["xml:lang"] = "en"; | |
67 body.attr.ver = "1.6"; | |
68 | |
69 -- XEP-0206 | |
70 body.attr.from = self.jid; | |
71 body.attr.to = self.host; | |
72 body.attr.secure = 'true'; | |
73 | |
74 self.http.request(self.bosh_url, { body = tostring(body) }, function (response) | |
75 -- Handle session creation response | |
76 local payload = self:_parse_response(response) | |
77 if not payload then | |
78 self:warn("Invalid session creation response"); | |
79 self:event("disconnected"); | |
80 return; | |
81 end | |
82 self.bosh_sid = payload.attr.sid; | |
83 self.bosh_wait = tonumber(payload.attr.wait); | |
84 self.bosh_hold = tonumber(payload.attr.hold); | |
85 self.bosh_max_requests = tonumber(payload.attr.requests) or self.bosh_hold; | |
86 self.connected = true; | |
87 self:event("connected"); | |
88 self:_handle_response_payload(payload); | |
89 end); | |
90 end | |
91 | |
92 function stream_mt:_handle_response(response, code, request) | |
93 if self.bosh_waiting_requests[1] ~= request then | |
94 self:warn("Server replied to request that wasn't the oldest"); | |
95 else | |
96 table.remove(self.bosh_waiting_requests, 1); | |
97 end | |
98 local payload = self:_parse_response(response); | |
99 if payload then | |
100 self:_handle_response_payload(payload); | |
101 end | |
102 if #self.bosh_waiting_requests == 0 then | |
103 self:debug("We have no requests open, so forcing flush..."); | |
104 self:flush(true); | |
105 else | |
106 self:debug("We have %d requests open, so no need to force a flush", #self.bosh_waiting_requests); | |
107 end | |
108 end | |
109 | |
110 function stream_mt:_handle_response_payload(payload) | |
111 for stanza in payload:childtags() do | |
112 if stanza.attr.xmlns == xmlns_stream then | |
113 self:event("stream-"..stanza.name, stanza); | |
114 elseif stanza.attr.xmlns then | |
115 self:event("stream/"..stanza.attr.xmlns, stanza); | |
116 else | |
117 self:event("stanza", stanza); | |
118 end | |
119 end | |
120 end | |
121 | |
122 local stream_callbacks = { | |
123 stream_ns = "http://jabber.org/protocol/httpbind", stream_tag = "body", | |
124 default_ns = "jabber:client", | |
125 streamopened = function (session, attr) session.notopen = nil; session.payload = verse.stanza("body", attr); return true; end; | |
126 handlestanza = function (session, stanza) session.payload:add_child(stanza); end; | |
127 }; | |
128 function stream_mt:_parse_response(response) | |
129 self:debug("Parsing response: %s", response); | |
130 local session = { notopen = true, log = self.log }; | |
131 local parser = lxp.new(init_xmlhandlers(session, stream_callbacks), "\1"); | |
132 parser:parse(response); | |
133 return session.payload; | |
134 end | |
135 | |
136 function stream_mt:_make_body() | |
137 self.bosh_rid = self.bosh_rid + 1; | |
138 local body = verse.stanza("body", { | |
139 xmlns = xmlns_bosh; | |
140 content = "text/xml; charset=utf-8"; | |
141 sid = self.bosh_sid; | |
142 rid = self.bosh_rid; | |
143 }); | |
144 if self.bosh_need_restart then | |
145 self.bosh_need_restart = nil; | |
146 body.attr.restart = 'true'; | |
147 end | |
148 return body; | |
149 end |