Comparison

plugins/mod_bosh.lua @ 7382:c8923f882274

Merge 0.10->trunk
author Kim Alvefur <zash@zash.se>
date Tue, 19 Apr 2016 17:10:42 +0200
parent 7359:a5a080c12c96
parent 7381:a05bf94646ba
child 7388:77a3ef937152
comparison
equal deleted inserted replaced
7373:9b37aaea68e9 7382:c8923f882274
20 local log = logger.init("mod_bosh"); 20 local log = logger.init("mod_bosh");
21 local initialize_filters = require "util.filters".initialize; 21 local initialize_filters = require "util.filters".initialize;
22 local math_min = math.min; 22 local math_min = math.min;
23 local xpcall, tostring, type = xpcall, tostring, type; 23 local xpcall, tostring, type = xpcall, tostring, type;
24 local traceback = debug.traceback; 24 local traceback = debug.traceback;
25 local runner = require"util.async".runner;
26 25
27 local xmlns_streams = "http://etherx.jabber.org/streams"; 26 local xmlns_streams = "http://etherx.jabber.org/streams";
28 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; 27 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
29 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; -- (hard-coded into a literal in session.send) 28 local xmlns_bosh = "http://jabber.org/protocol/httpbind"; -- (hard-coded into a literal in session.send)
30 29
61 60
62 local t_insert, t_remove, t_concat = table.insert, table.remove, table.concat; 61 local t_insert, t_remove, t_concat = table.insert, table.remove, table.concat;
63 local os_time = os.time; 62 local os_time = os.time;
64 63
65 -- All sessions, and sessions that have no requests open 64 -- All sessions, and sessions that have no requests open
66 local sessions = module:shared("sessions"); 65 local sessions, inactive_sessions = module:shared("sessions", "inactive_sessions");
67 66
68 -- Used to respond to idle sessions (those with waiting requests) 67 -- Used to respond to idle sessions (those with waiting requests)
68 local waiting_requests = module:shared("waiting_requests");
69 function on_destroy_request(request) 69 function on_destroy_request(request)
70 log("debug", "Request destroyed: %s", tostring(request)); 70 log("debug", "Request destroyed: %s", tostring(request));
71 waiting_requests[request] = nil;
71 local session = sessions[request.context.sid]; 72 local session = sessions[request.context.sid];
72 if session then 73 if session then
73 local requests = session.requests; 74 local requests = session.requests;
74 for i, r in ipairs(requests) do 75 for i, r in ipairs(requests) do
75 if r == request then 76 if r == request then
79 end 80 end
80 81
81 -- If this session now has no requests open, mark it as inactive 82 -- If this session now has no requests open, mark it as inactive
82 local max_inactive = session.bosh_max_inactive; 83 local max_inactive = session.bosh_max_inactive;
83 if max_inactive and #requests == 0 then 84 if max_inactive and #requests == 0 then
84 if session.inactive_timer then 85 inactive_sessions[session] = os_time() + max_inactive;
85 session.inactive_timer:stop();
86 end
87 session.inactive_timer = module:add_timer(max_inactive, check_inactive, session, request.context,
88 "BOSH client silent for over "..max_inactive.." seconds");
89 (session.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive); 86 (session.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive);
90 end 87 end
91 if session.bosh_wait_timer then
92 session.bosh_wait_timer:stop();
93 session.bosh_wait_timer = nil;
94 end
95 end
96 end
97
98 function check_inactive(now, session, context, reason)
99 if not sessions.destroyed then
100 sessions[context.sid] = nil;
101 sm_destroy_session(session, reason);
102 end 88 end
103 end 89 end
104 90
105 local function set_cross_domain_headers(response) 91 local function set_cross_domain_headers(response)
106 local headers = response.headers; 92 local headers = response.headers;
130 response.context = context; 116 response.context = context;
131 117
132 local headers = response.headers; 118 local headers = response.headers;
133 headers.content_type = "text/xml; charset=utf-8"; 119 headers.content_type = "text/xml; charset=utf-8";
134 120
135 if cross_domain and request.headers.origin then 121 if cross_domain and event.request.headers.origin then
136 set_cross_domain_headers(response); 122 set_cross_domain_headers(response);
137 end 123 end
138 124
139 -- stream:feed() calls the stream_callbacks, so all stanzas in 125 -- stream:feed() calls the stream_callbacks, so all stanzas in
140 -- the body are processed in this next line before it returns. 126 -- the body are processed in this next line before it returns.
141 -- In particular, the streamopened() stream callback is where 127 -- In particular, the streamopened() stream callback is where
142 -- much of the session logic happens, because it's where we first 128 -- much of the session logic happens, because it's where we first
143 -- get to see the 'sid' of this request. 129 -- get to see the 'sid' of this request.
144 if not stream:feed(body) then 130 local ok, err = stream:feed(body);
145 module:log("warn", "Error parsing BOSH payload") 131 if not ok then
146 return 400; 132 module:log("warn", "Error parsing BOSH payload; %s", err)
133 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
134 ["xmlns:stream"] = xmlns_streams, condition = "bad-request" });
135 return tostring(close_reply);
147 end 136 end
148 137
149 -- Stanzas (if any) in the request have now been processed, and 138 -- Stanzas (if any) in the request have now been processed, and
150 -- we take care of the high-level BOSH logic here, including 139 -- we take care of the high-level BOSH logic here, including
151 -- giving a response or putting the request "on hold". 140 -- giving a response or putting the request "on hold".
198 session:close(); 187 session:close();
199 return nil; 188 return nil;
200 else 189 else
201 return true; -- Inform http server we shall reply later 190 return true; -- Inform http server we shall reply later
202 end 191 end
192 elseif response.finished then
193 return; -- A response has been sent already
203 end 194 end
204 module:log("warn", "Unable to associate request with a session (incomplete request?)"); 195 module:log("warn", "Unable to associate request with a session (incomplete request?)");
205 return 400; 196 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
197 ["xmlns:stream"] = xmlns_streams, condition = "item-not-found" });
198 return tostring(close_reply) .. "\n";
206 end 199 end
207 200
208 function after_bosh_wait(now, request, session) 201 function after_bosh_wait(now, request, session)
209 if request.conn then 202 if request.conn then
210 session.send(""); 203 session.send("");
261 log("debug", "BOSH body open (sid: %s)", sid or "<none>"); 254 log("debug", "BOSH body open (sid: %s)", sid or "<none>");
262 if not sid then 255 if not sid then
263 -- New session request 256 -- New session request
264 context.notopen = nil; -- Signals that we accept this opening tag 257 context.notopen = nil; -- Signals that we accept this opening tag
265 258
266 -- TODO: Sanity checks here (rid, to, known host, etc.) 259 local to_host = nameprep(attr.to);
267 if not hosts[attr.to] then 260 local rid = tonumber(attr.rid);
261 local wait = tonumber(attr.wait);
262 if not to_host then
263 log("debug", "BOSH client tried to connect to invalid host: %s", tostring(attr.to));
264 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
265 ["xmlns:stream"] = xmlns_streams, condition = "improper-addressing" });
266 response:send(tostring(close_reply));
267 return;
268 elseif not hosts[to_host] then
268 -- Unknown host 269 -- Unknown host
269 log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to)); 270 log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to));
270 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", 271 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
271 ["xmlns:stream"] = xmlns_streams, condition = "host-unknown" }); 272 ["xmlns:stream"] = xmlns_streams, condition = "host-unknown" });
272 response:send(tostring(close_reply)); 273 response:send(tostring(close_reply));
273 return; 274 return;
274 end 275 end
276 if not rid or (not wait and attr.wait or wait < 0 or wait % 1 ~= 0) then
277 log("debug", "BOSH client sent invalid rid or wait attributes: rid=%s, wait=%s", tostring(attr.rid), tostring(attr.wait));
278 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
279 ["xmlns:stream"] = xmlns_streams, condition = "bad-request" });
280 response:send(tostring(close_reply));
281 return;
282 end
283
284 rid = rid - 1;
285 wait = math_min(wait, bosh_max_wait);
275 286
276 -- New session 287 -- New session
277 sid = new_uuid(); 288 sid = new_uuid();
278 local session = { 289 local session = {
279 type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid)-1, host = attr.to, 290 type = "c2s_unauthed", conn = {}, sid = sid, rid = rid-1, host = attr.to,
280 bosh_version = attr.ver, bosh_wait = math_min(attr.wait, bosh_max_wait), streamid = sid, 291 bosh_version = attr.ver, bosh_wait = wait, streamid = sid,
281 bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY, 292 bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY,
282 requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream, 293 requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream,
283 close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true, 294 close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true,
284 log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure, 295 log = logger.init("bosh"..sid), secure = consider_bosh_secure or request.secure,
285 ip = get_ip_from_request(request); 296 ip = get_ip_from_request(request);
418 429
419 function stream_callbacks.error(context, error) 430 function stream_callbacks.error(context, error)
420 log("debug", "Error parsing BOSH request payload; %s", error); 431 log("debug", "Error parsing BOSH request payload; %s", error);
421 if not context.sid then 432 if not context.sid then
422 local response = context.response; 433 local response = context.response;
423 response.status_code = 400; 434 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
424 response:send(); 435 ["xmlns:stream"] = xmlns_streams, condition = "bad-request" });
436 response:send(tostring(close_reply));
425 return; 437 return;
426 end 438 end
427 439
428 local session = sessions[context.sid]; 440 local session = sessions[context.sid];
429 if error == "stream-error" then -- Remote stream error, we close normally 441 if error == "stream-error" then -- Remote stream error, we close normally