Software /
code /
prosody
Comparison
plugins/mod_bosh.lua @ 3043:1fadbb2e3ca0
Merge with 0.6
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Mon, 17 May 2010 11:56:36 +0100 |
parent | 2961:db3c0ecce3f4 |
parent | 3042:b1961f6c9853 |
child | 3070:3238b58fd118 |
comparison
equal
deleted
inserted
replaced
3009:06f7d8054065 | 3043:1fadbb2e3ca0 |
---|---|
61 function on_destroy_request(request) | 61 function on_destroy_request(request) |
62 waiting_requests[request] = nil; | 62 waiting_requests[request] = nil; |
63 local session = sessions[request.sid]; | 63 local session = sessions[request.sid]; |
64 if session then | 64 if session then |
65 local requests = session.requests; | 65 local requests = session.requests; |
66 for i,r in pairs(requests) do | 66 for i,r in ipairs(requests) do |
67 if r == request then requests[i] = nil; break; end | 67 if r == request then |
68 t_remove(requests, i); | |
69 break; | |
70 end | |
68 end | 71 end |
69 | 72 |
70 -- If this session now has no requests open, mark it as inactive | 73 -- If this session now has no requests open, mark it as inactive |
71 if #requests == 0 and session.bosh_max_inactive and not inactive_sessions[session] then | 74 if #requests == 0 and session.bosh_max_inactive and not inactive_sessions[session] then |
72 inactive_sessions[session] = os_time(); | 75 inactive_sessions[session] = os_time(); |
88 return; | 91 return; |
89 end | 92 end |
90 --log("debug", "Handling new request %s: %s\n----------", request.id, tostring(body)); | 93 --log("debug", "Handling new request %s: %s\n----------", request.id, tostring(body)); |
91 request.notopen = true; | 94 request.notopen = true; |
92 request.log = log; | 95 request.log = log; |
96 request.on_destroy = on_destroy_request; | |
97 | |
93 local parser = lxp.new(init_xmlhandlers(request, stream_callbacks), "\1"); | 98 local parser = lxp.new(init_xmlhandlers(request, stream_callbacks), "\1"); |
94 | 99 |
95 parser:parse(body); | 100 parser:parse(body); |
96 | 101 |
97 local session = sessions[request.sid]; | 102 local session = sessions[request.sid]; |
116 local resp = t_concat(session.send_buffer); | 121 local resp = t_concat(session.send_buffer); |
117 session.send_buffer = {}; | 122 session.send_buffer = {}; |
118 session.send(resp); | 123 session.send(resp); |
119 end | 124 end |
120 | 125 |
121 if not request.destroyed and session.bosh_wait then | 126 if not request.destroyed then |
122 request.reply_before = os_time() + session.bosh_wait; | 127 -- We're keeping this request open, to respond later |
123 request.on_destroy = on_destroy_request; | 128 log("debug", "Have nothing to say, so leaving request unanswered for now"); |
124 waiting_requests[request] = true; | 129 if session.bosh_wait then |
125 end | 130 request.reply_before = os_time() + session.bosh_wait; |
126 | 131 waiting_requests[request] = true; |
127 log("debug", "Have nothing to say, so leaving request unanswered for now"); | 132 end |
128 return true; | 133 if inactive_sessions[session] then |
134 -- Session was marked as inactive, since we have | |
135 -- a request open now, unmark it | |
136 inactive_sessions[session] = nil; | |
137 end | |
138 end | |
139 | |
140 return true; -- Inform httpserver we shall reply later | |
129 end | 141 end |
130 end | 142 end |
131 | 143 |
132 | 144 |
133 local function bosh_reset_stream(session) session.notopen = true; end | 145 local function bosh_reset_stream(session) session.notopen = true; end |
160 return; | 172 return; |
161 end | 173 end |
162 | 174 |
163 -- New session | 175 -- New session |
164 sid = new_uuid(); | 176 sid = new_uuid(); |
165 local session = { type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid), host = attr.to, bosh_version = attr.ver, bosh_wait = attr.wait, streamid = sid, | 177 local session = { type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid)-1, host = attr.to, bosh_version = attr.ver, bosh_wait = attr.wait, streamid = sid, |
166 bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY, | 178 bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY, |
167 requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream, close = bosh_close_stream, | 179 requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream, close = bosh_close_stream, |
168 dispatch_stanza = core_process_stanza, log = logger.init("bosh"..sid), secure = request.secure }; | 180 dispatch_stanza = core_process_stanza, log = logger.init("bosh"..sid), secure = request.secure }; |
169 sessions[sid] = session; | 181 sessions[sid] = session; |
170 | 182 |
172 local r, send_buffer = session.requests, session.send_buffer; | 184 local r, send_buffer = session.requests, session.send_buffer; |
173 local response = { headers = default_headers } | 185 local response = { headers = default_headers } |
174 function session.send(s) | 186 function session.send(s) |
175 --log("debug", "Sending BOSH data: %s", tostring(s)); | 187 --log("debug", "Sending BOSH data: %s", tostring(s)); |
176 local oldest_request = r[1]; | 188 local oldest_request = r[1]; |
177 while oldest_request and oldest_request.destroyed do | |
178 t_remove(r, 1); | |
179 waiting_requests[oldest_request] = nil; | |
180 oldest_request = r[1]; | |
181 end | |
182 if oldest_request then | 189 if oldest_request then |
183 log("debug", "We have an open request, so sending on that"); | 190 log("debug", "We have an open request, so sending on that"); |
184 response.body = t_concat{"<body xmlns='http://jabber.org/protocol/httpbind' sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>", tostring(s), "</body>" }; | 191 response.body = t_concat{"<body xmlns='http://jabber.org/protocol/httpbind' sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>", tostring(s), "</body>" }; |
185 oldest_request:send(response); | 192 oldest_request:send(response); |
186 --log("debug", "Sent"); | 193 --log("debug", "Sent"); |
191 t_remove(r, 1); | 198 t_remove(r, 1); |
192 end | 199 end |
193 else | 200 else |
194 log("debug", "Destroying the request now..."); | 201 log("debug", "Destroying the request now..."); |
195 oldest_request:destroy(); | 202 oldest_request:destroy(); |
196 t_remove(r, 1); | |
197 end | 203 end |
198 elseif s ~= "" then | 204 elseif s ~= "" then |
199 log("debug", "Saved to send buffer because there are %d open requests", #r); | 205 log("debug", "Saved to send buffer because there are %d open requests", #r); |
200 -- Hmm, no requests are open :( | 206 -- Hmm, no requests are open :( |
201 t_insert(session.send_buffer, tostring(s)); | 207 t_insert(session.send_buffer, tostring(s)); |
233 local diff = rid - session.rid; | 239 local diff = rid - session.rid; |
234 if diff > 1 then | 240 if diff > 1 then |
235 session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid); | 241 session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid); |
236 elseif diff <= 0 then | 242 elseif diff <= 0 then |
237 -- Repeated, ignore | 243 -- Repeated, ignore |
238 session.log("debug", "rid repeated (on request %s), ignoring: %d", request.id, session.rid); | 244 session.log("debug", "rid repeated (on request %s), ignoring: %s (diff %d)", request.id, session.rid, diff); |
239 request.notopen = nil; | 245 request.notopen = nil; |
246 request.sid = sid; | |
240 t_insert(session.requests, request); | 247 t_insert(session.requests, request); |
241 return; | 248 return; |
242 end | 249 end |
243 session.rid = rid; | 250 session.rid = rid; |
244 end | 251 end |
246 if attr.type == "terminate" then | 253 if attr.type == "terminate" then |
247 -- Client wants to end this session | 254 -- Client wants to end this session |
248 session:close(); | 255 session:close(); |
249 request.notopen = nil; | 256 request.notopen = nil; |
250 return; | 257 return; |
251 end | |
252 | |
253 -- If session was inactive, make sure it is now marked as not | |
254 if #session.requests == 0 then | |
255 (session.log or log)("debug", "BOSH client now active again at %d", os_time()); | |
256 inactive_sessions[session] = nil; | |
257 end | 258 end |
258 | 259 |
259 if session.notopen then | 260 if session.notopen then |
260 local features = st.stanza("stream:features"); | 261 local features = st.stanza("stream:features"); |
261 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); | 262 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); |