Software /
code /
prosody
Comparison
net/server_epoll.lua @ 10563:e8db377a2983
Merge 0.11->trunk
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Tue, 24 Dec 2019 00:39:45 +0100 |
parent | 10546:944863f878b9 |
child | 10571:cfeb0077c9e9 |
comparison
equal
deleted
inserted
replaced
10562:670afc079f68 | 10563:e8db377a2983 |
---|---|
7 | 7 |
8 | 8 |
9 local t_insert = table.insert; | 9 local t_insert = table.insert; |
10 local t_concat = table.concat; | 10 local t_concat = table.concat; |
11 local setmetatable = setmetatable; | 11 local setmetatable = setmetatable; |
12 local tostring = tostring; | |
13 local pcall = pcall; | 12 local pcall = pcall; |
14 local type = type; | 13 local type = type; |
15 local next = next; | 14 local next = next; |
16 local pairs = pairs; | 15 local pairs = pairs; |
17 local log = require "util.logger".init("server_epoll"); | 16 local logger = require "util.logger"; |
17 local log = logger.init("server_epoll"); | |
18 local socket = require "socket"; | 18 local socket = require "socket"; |
19 local luasec = require "ssl"; | 19 local luasec = require "ssl"; |
20 local gettime = require "util.time".now; | 20 local realtime = require "util.time".now; |
21 local monotonic = require "util.time".monotonic; | |
21 local indexedbheap = require "util.indexedbheap"; | 22 local indexedbheap = require "util.indexedbheap"; |
22 local createtable = require "util.table".create; | 23 local createtable = require "util.table".create; |
23 local inet = require "util.net"; | 24 local inet = require "util.net"; |
24 local inet_pton = inet.pton; | 25 local inet_pton = inet.pton; |
25 local _SOCKETINVALID = socket._SOCKETINVALID or -1; | 26 local _SOCKETINVALID = socket._SOCKETINVALID or -1; |
27 local new_id = require "util.id".medium; | |
26 | 28 |
27 local poller = require "util.poll" | 29 local poller = require "util.poll" |
28 local EEXIST = poller.EEXIST; | 30 local EEXIST = poller.EEXIST; |
29 local ENOENT = poller.ENOENT; | 31 local ENOENT = poller.ENOENT; |
30 | 32 |
36 local default_config = { __index = { | 38 local default_config = { __index = { |
37 -- If a connection is silent for this long, close it unless onreadtimeout says not to | 39 -- If a connection is silent for this long, close it unless onreadtimeout says not to |
38 read_timeout = 14 * 60; | 40 read_timeout = 14 * 60; |
39 | 41 |
40 -- How long to wait for a socket to become writable after queuing data to send | 42 -- How long to wait for a socket to become writable after queuing data to send |
41 send_timeout = 60; | 43 send_timeout = 180; |
44 | |
45 -- How long to wait for a socket to become writable after creation | |
46 connect_timeout = 20; | |
42 | 47 |
43 -- Some number possibly influencing how many pending connections can be accepted | 48 -- Some number possibly influencing how many pending connections can be accepted |
44 tcp_backlog = 128; | 49 tcp_backlog = 128; |
45 | 50 |
46 -- If accepting a new incoming connection fails, wait this long before trying again | 51 -- If accepting a new incoming connection fails, wait this long before trying again |
56 ssl_handshake_timeout = 60; | 61 ssl_handshake_timeout = 60; |
57 | 62 |
58 -- Maximum and minimum amount of time to sleep waiting for events (adjusted for pending timers) | 63 -- Maximum and minimum amount of time to sleep waiting for events (adjusted for pending timers) |
59 max_wait = 86400; | 64 max_wait = 86400; |
60 min_wait = 1e-06; | 65 min_wait = 1e-06; |
66 | |
67 -- EXPERIMENTAL | |
68 -- Whether to kill connections in case of callback errors. | |
69 fatal_errors = false; | |
70 | |
71 -- Attempt writes instantly | |
72 opportunistic_writes = false; | |
61 }}; | 73 }}; |
62 local cfg = default_config.__index; | 74 local cfg = default_config.__index; |
63 | 75 |
64 local fds = createtable(10, 0); -- FD -> conn | 76 local fds = createtable(10, 0); -- FD -> conn |
65 | 77 |
73 t[2] = noop; | 85 t[2] = noop; |
74 timers:remove(t.id); | 86 timers:remove(t.id); |
75 end | 87 end |
76 | 88 |
77 local function reschedule(t, time) | 89 local function reschedule(t, time) |
90 time = monotonic() + time; | |
78 t[1] = time; | 91 t[1] = time; |
79 timers:reprioritize(t.id, time); | 92 timers:reprioritize(t.id, time); |
80 end | 93 end |
81 | 94 |
82 -- Add absolute timer | 95 -- Add relative timer |
83 local function at(time, f) | 96 local function addtimer(timeout, f) |
97 local time = monotonic() + timeout; | |
84 local timer = { time, f, close = closetimer, reschedule = reschedule, id = nil }; | 98 local timer = { time, f, close = closetimer, reschedule = reschedule, id = nil }; |
85 timer.id = timers:insert(timer, time); | 99 timer.id = timers:insert(timer, time); |
86 return timer; | 100 return timer; |
87 end | |
88 | |
89 -- Add relative timer | |
90 local function addtimer(timeout, f) | |
91 return at(gettime() + timeout, f); | |
92 end | 101 end |
93 | 102 |
94 -- Run callbacks of expired timers | 103 -- Run callbacks of expired timers |
95 -- Return time until next timeout | 104 -- Return time until next timeout |
96 local function runtimers(next_delay, min_wait) | 105 local function runtimers(next_delay, min_wait) |
97 -- Any timers at all? | 106 -- Any timers at all? |
98 local now = gettime(); | 107 local elapsed = monotonic(); |
108 local now = realtime(); | |
99 local peek = timers:peek(); | 109 local peek = timers:peek(); |
100 while peek do | 110 while peek do |
101 | 111 |
102 if peek > now then | 112 if peek > elapsed then |
103 next_delay = peek - now; | 113 next_delay = peek - elapsed; |
104 break; | 114 break; |
105 end | 115 end |
106 | 116 |
107 local _, timer, id = timers:pop(); | 117 local _, timer = timers:pop(); |
108 local ok, ret = pcall(timer[2], now); | 118 local ok, ret = pcall(timer[2], now); |
109 if ok and type(ret) == "number" then | 119 if ok and type(ret) == "number" then |
110 local next_time = now+ret; | 120 local next_time = elapsed+ret; |
111 timer[1] = next_time; | 121 timer[1] = next_time; |
112 timers:insert(timer, next_time); | 122 timers:insert(timer, next_time); |
113 end | 123 end |
114 | 124 |
115 peek = timers:peek(); | 125 peek = timers:peek(); |
116 end | 126 end |
117 if peek == nil then | 127 if peek == nil then |
118 return next_delay; | 128 return next_delay; |
119 end | 129 end |
120 | 130 |
121 if next_delay < min_wait then | 131 if next_delay < min_wait then |
136 return ("FD %d (%s, %d)"):format(self:getfd(), self.sockname or self.peername, self.sockport or self.peerport); | 146 return ("FD %d (%s, %d)"):format(self:getfd(), self.sockname or self.peername, self.sockport or self.peerport); |
137 end | 147 end |
138 return ("FD %d"):format(self:getfd()); | 148 return ("FD %d"):format(self:getfd()); |
139 end | 149 end |
140 | 150 |
151 interface.log = log; | |
152 function interface:debug(msg, ...) --luacheck: ignore 212/self | |
153 self.log("debug", msg, ...); | |
154 end | |
155 | |
156 function interface:error(msg, ...) --luacheck: ignore 212/self | |
157 self.log("error", msg, ...); | |
158 end | |
159 | |
141 -- Replace the listener and tell the old one | 160 -- Replace the listener and tell the old one |
142 function interface:setlistener(listeners, data) | 161 function interface:setlistener(listeners, data) |
143 self:on("detach"); | 162 self:on("detach"); |
144 self.listeners = listeners; | 163 self.listeners = listeners; |
145 self:on("attach", data); | 164 self:on("attach", data); |
146 end | 165 end |
147 | 166 |
148 -- Call a listener callback | 167 -- Call a listener callback |
149 function interface:on(what, ...) | 168 function interface:on(what, ...) |
150 if not self.listeners then | 169 if not self.listeners then |
151 log("error", "%s has no listeners", self); | 170 self:debug("Interface is missing listener callbacks"); |
152 return; | 171 return; |
153 end | 172 end |
154 local listener = self.listeners["on"..what]; | 173 local listener = self.listeners["on"..what]; |
155 if not listener then | 174 if not listener then |
156 -- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging | 175 -- self:debug("Missing listener 'on%s'", what); -- uncomment for development and debugging |
157 return; | 176 return; |
158 end | 177 end |
159 local ok, err = pcall(listener, self, ...); | 178 local ok, err = pcall(listener, self, ...); |
160 if not ok then | 179 if not ok then |
161 log("error", "Error calling on%s: %s", what, err); | 180 if cfg.fatal_errors then |
181 self:debug("Closing due to error calling on%s: %s", what, err); | |
182 self:destroy(); | |
183 else | |
184 self:debug("Error calling on%s: %s", what, err); | |
185 end | |
186 return nil, err; | |
162 end | 187 end |
163 return err; | 188 return err; |
189 end | |
190 | |
191 -- Allow this one to be overridden | |
192 function interface:onincoming(...) | |
193 return self:on("incoming", ...); | |
164 end | 194 end |
165 | 195 |
166 -- Return the file descriptor number | 196 -- Return the file descriptor number |
167 function interface:getfd() | 197 function interface:getfd() |
168 if self.conn then | 198 if self.conn then |
224 end | 254 end |
225 return | 255 return |
226 end | 256 end |
227 t = t or cfg.read_timeout; | 257 t = t or cfg.read_timeout; |
228 if self._readtimeout then | 258 if self._readtimeout then |
229 self._readtimeout:reschedule(gettime() + t); | 259 self._readtimeout:reschedule(t); |
230 else | 260 else |
231 self._readtimeout = addtimer(t, function () | 261 self._readtimeout = addtimer(t, function () |
232 if self:on("readtimeout") then | 262 if self:on("readtimeout") then |
263 self:debug("Read timeout handled"); | |
233 return cfg.read_timeout; | 264 return cfg.read_timeout; |
234 else | 265 else |
266 self:debug("Read timeout not handled, disconnecting"); | |
235 self:on("disconnect", "read timeout"); | 267 self:on("disconnect", "read timeout"); |
236 self:destroy(); | 268 self:destroy(); |
237 end | 269 end |
238 end); | 270 end); |
239 end | 271 end |
248 end | 280 end |
249 return | 281 return |
250 end | 282 end |
251 t = t or cfg.send_timeout; | 283 t = t or cfg.send_timeout; |
252 if self._writetimeout then | 284 if self._writetimeout then |
253 self._writetimeout:reschedule(gettime() + t); | 285 self._writetimeout:reschedule(t); |
254 else | 286 else |
255 self._writetimeout = addtimer(t, function () | 287 self._writetimeout = addtimer(t, function () |
288 self:debug("Write timeout"); | |
256 self:on("disconnect", "write timeout"); | 289 self:on("disconnect", "write timeout"); |
257 self:destroy(); | 290 self:destroy(); |
258 end); | 291 end); |
259 end | 292 end |
260 end | 293 end |
267 if r == nil then r = self._wantread; end | 300 if r == nil then r = self._wantread; end |
268 if w == nil then w = self._wantwrite; end | 301 if w == nil then w = self._wantwrite; end |
269 local ok, err, errno = poll:add(fd, r, w); | 302 local ok, err, errno = poll:add(fd, r, w); |
270 if not ok then | 303 if not ok then |
271 if errno == EEXIST then | 304 if errno == EEXIST then |
272 log("debug", "%s already registered!", self); | 305 self:debug("FD already registered in poller! (EEXIST)"); |
273 return self:set(r, w); -- So try to change its flags | 306 return self:set(r, w); -- So try to change its flags |
274 end | 307 end |
275 log("error", "Could not register %s: %s(%d)", self, err, errno); | 308 self:debug("Could not register in poller: %s(%d)", err, errno); |
276 return ok, err; | 309 return ok, err; |
277 end | 310 end |
278 self._wantread, self._wantwrite = r, w; | 311 self._wantread, self._wantwrite = r, w; |
279 fds[fd] = self; | 312 fds[fd] = self; |
280 log("debug", "Watching %s", self); | 313 self:debug("Registered in poller"); |
281 return true; | 314 return true; |
282 end | 315 end |
283 | 316 |
284 function interface:set(r, w) | 317 function interface:set(r, w) |
285 local fd = self:getfd(); | 318 local fd = self:getfd(); |
288 end | 321 end |
289 if r == nil then r = self._wantread; end | 322 if r == nil then r = self._wantread; end |
290 if w == nil then w = self._wantwrite; end | 323 if w == nil then w = self._wantwrite; end |
291 local ok, err, errno = poll:set(fd, r, w); | 324 local ok, err, errno = poll:set(fd, r, w); |
292 if not ok then | 325 if not ok then |
293 log("error", "Could not update poller state %s: %s(%d)", self, err, errno); | 326 self:debug("Could not update poller state: %s(%d)", err, errno); |
294 return ok, err; | 327 return ok, err; |
295 end | 328 end |
296 self._wantread, self._wantwrite = r, w; | 329 self._wantread, self._wantwrite = r, w; |
297 return true; | 330 return true; |
298 end | 331 end |
305 if fds[fd] ~= self then | 338 if fds[fd] ~= self then |
306 return nil, "unregistered fd"; | 339 return nil, "unregistered fd"; |
307 end | 340 end |
308 local ok, err, errno = poll:del(fd); | 341 local ok, err, errno = poll:del(fd); |
309 if not ok and errno ~= ENOENT then | 342 if not ok and errno ~= ENOENT then |
310 log("error", "Could not unregister %s: %s(%d)", self, err, errno); | 343 self:debug("Could not unregister: %s(%d)", err, errno); |
311 return ok, err; | 344 return ok, err; |
312 end | 345 end |
313 self._wantread, self._wantwrite = nil, nil; | 346 self._wantread, self._wantwrite = nil, nil; |
314 fds[fd] = nil; | 347 fds[fd] = nil; |
315 log("debug", "Unwatched %s", self); | 348 self:debug("Unregistered from poller"); |
316 return true; | 349 return true; |
317 end | 350 end |
318 | 351 |
319 function interface:setflags(r, w) | 352 function interface:setflags(r, w) |
320 if not(self._wantread or self._wantwrite) then | 353 if not(self._wantread or self._wantwrite) then |
332 -- Called when socket is readable | 365 -- Called when socket is readable |
333 function interface:onreadable() | 366 function interface:onreadable() |
334 local data, err, partial = self.conn:receive(self.read_size or cfg.read_size); | 367 local data, err, partial = self.conn:receive(self.read_size or cfg.read_size); |
335 if data then | 368 if data then |
336 self:onconnect(); | 369 self:onconnect(); |
337 self:on("incoming", data); | 370 self:onincoming(data); |
338 else | 371 else |
339 if err == "wantread" then | 372 if err == "wantread" then |
340 self:set(true, nil); | 373 self:set(true, nil); |
341 err = "timeout"; | 374 err = "timeout"; |
342 elseif err == "wantwrite" then | 375 elseif err == "wantwrite" then |
343 self:set(nil, true); | 376 self:set(nil, true); |
344 err = "timeout"; | 377 err = "timeout"; |
345 end | 378 end |
346 if partial and partial ~= "" then | 379 if partial and partial ~= "" then |
347 self:onconnect(); | 380 self:onconnect(); |
348 self:on("incoming", partial, err); | 381 self:onincoming(partial, err); |
349 end | 382 end |
350 if err ~= "timeout" then | 383 if err ~= "timeout" then |
351 self:on("disconnect", err); | 384 self:on("disconnect", err); |
352 self:destroy() | 385 self:destroy() |
353 return; | 386 return; |
354 end | 387 end |
355 end | 388 end |
356 if not self.conn then return; end | 389 if not self.conn then return; end |
390 if self._limit and (data or partial) then | |
391 local cost = self._limit * #(data or partial); | |
392 if cost > cfg.min_wait then | |
393 self:setreadtimeout(false); | |
394 self:pausefor(cost); | |
395 return; | |
396 end | |
397 end | |
357 if self._wantread and self.conn:dirty() then | 398 if self._wantread and self.conn:dirty() then |
358 self:setreadtimeout(false); | 399 self:setreadtimeout(false); |
359 self:pausefor(cfg.read_retry_delay); | 400 self:pausefor(cfg.read_retry_delay); |
360 else | 401 else |
361 self:setreadtimeout(); | 402 self:setreadtimeout(); |
376 end | 417 end |
377 self:setwritetimeout(false); | 418 self:setwritetimeout(false); |
378 self:ondrain(); -- Be aware of writes in ondrain | 419 self:ondrain(); -- Be aware of writes in ondrain |
379 return; | 420 return; |
380 elseif partial then | 421 elseif partial then |
422 self:debug("Sent %d out of %d buffered bytes", partial, #data); | |
381 buffer[1] = data:sub(partial+1); | 423 buffer[1] = data:sub(partial+1); |
382 for i = #buffer, 2, -1 do | 424 for i = #buffer, 2, -1 do |
383 buffer[i] = nil; | 425 buffer[i] = nil; |
384 end | 426 end |
427 self:set(nil, true); | |
385 self:setwritetimeout(); | 428 self:setwritetimeout(); |
386 end | 429 end |
387 if err == "wantwrite" or err == "timeout" then | 430 if err == "wantwrite" or err == "timeout" then |
388 self:set(nil, true); | 431 self:set(nil, true); |
389 elseif err == "wantread" then | 432 elseif err == "wantread" then |
405 if buffer then | 448 if buffer then |
406 t_insert(buffer, data); | 449 t_insert(buffer, data); |
407 else | 450 else |
408 self.writebuffer = { data }; | 451 self.writebuffer = { data }; |
409 end | 452 end |
410 self:setwritetimeout(); | 453 if not self._write_lock then |
411 self:set(nil, true); | 454 if cfg.opportunistic_writes then |
455 self:onwritable(); | |
456 return #data; | |
457 end | |
458 self:setwritetimeout(); | |
459 self:set(nil, true); | |
460 end | |
412 return #data; | 461 return #data; |
413 end | 462 end |
414 interface.send = interface.write; | 463 interface.send = interface.write; |
415 | 464 |
416 -- Close, possibly after writing is done | 465 -- Close, possibly after writing is done |
417 function interface:close() | 466 function interface:close() |
418 if self.writebuffer and self.writebuffer[1] then | 467 if self.writebuffer and self.writebuffer[1] then |
419 self:set(false, true); -- Flush final buffer contents | 468 self:set(false, true); -- Flush final buffer contents |
420 self.write, self.send = noop, noop; -- No more writing | 469 self.write, self.send = noop, noop; -- No more writing |
421 log("debug", "Close %s after writing", self); | 470 self:debug("Close after writing remaining buffered data"); |
422 self.ondrain = interface.close; | 471 self.ondrain = interface.close; |
423 else | 472 else |
424 log("debug", "Close %s now", self); | 473 self:debug("Closing now"); |
425 self.write, self.send = noop, noop; | 474 self.write, self.send = noop, noop; |
426 self.close = noop; | 475 self.close = noop; |
427 self:on("disconnect"); | 476 self:on("disconnect"); |
428 self:destroy(); | 477 self:destroy(); |
429 end | 478 end |
448 | 497 |
449 function interface:starttls(tls_ctx) | 498 function interface:starttls(tls_ctx) |
450 if tls_ctx then self.tls_ctx = tls_ctx; end | 499 if tls_ctx then self.tls_ctx = tls_ctx; end |
451 self.starttls = false; | 500 self.starttls = false; |
452 if self.writebuffer and self.writebuffer[1] then | 501 if self.writebuffer and self.writebuffer[1] then |
453 log("debug", "Start TLS on %s after write", self); | 502 self:debug("Start TLS after write"); |
454 self.ondrain = interface.starttls; | 503 self.ondrain = interface.starttls; |
455 self:set(nil, true); -- make sure wantwrite is set | 504 self:set(nil, true); -- make sure wantwrite is set |
456 else | 505 else |
457 if self.ondrain == interface.starttls then | 506 if self.ondrain == interface.starttls then |
458 self.ondrain = nil; | 507 self.ondrain = nil; |
459 end | 508 end |
460 self.onwritable = interface.tlshandskake; | 509 self.onwritable = interface.tlshandskake; |
461 self.onreadable = interface.tlshandskake; | 510 self.onreadable = interface.tlshandskake; |
462 self:set(true, true); | 511 self:set(true, true); |
463 log("debug", "Prepare to start TLS on %s", self); | 512 self:debug("Prepared to start TLS"); |
464 end | 513 end |
465 end | 514 end |
466 | 515 |
467 function interface:tlshandskake() | 516 function interface:tlshandskake() |
468 self:setwritetimeout(false); | 517 self:setwritetimeout(false); |
469 self:setreadtimeout(false); | 518 self:setreadtimeout(false); |
470 if not self._tls then | 519 if not self._tls then |
471 self._tls = true; | 520 self._tls = true; |
472 log("debug", "Start TLS on %s now", self); | 521 self:debug("Starting TLS now"); |
473 self:del(); | 522 self:del(); |
523 self:updatenames(); -- Can't getpeer/sockname after wrap() | |
474 local ok, conn, err = pcall(luasec.wrap, self.conn, self.tls_ctx); | 524 local ok, conn, err = pcall(luasec.wrap, self.conn, self.tls_ctx); |
475 if not ok then | 525 if not ok then |
476 conn, err = ok, conn; | 526 conn, err = ok, conn; |
477 log("error", "Failed to initialize TLS: %s", err); | 527 self:debug("Failed to initialize TLS: %s", err); |
478 end | 528 end |
479 if not conn then | 529 if not conn then |
480 self:on("disconnect", err); | 530 self:on("disconnect", err); |
481 self:destroy(); | 531 self:destroy(); |
482 return conn, err; | 532 return conn, err; |
483 end | 533 end |
484 conn:settimeout(0); | 534 conn:settimeout(0); |
485 self.conn = conn; | 535 self.conn = conn; |
536 if conn.sni then | |
537 if self.servername then | |
538 conn:sni(self.servername); | |
539 elseif self._server and type(self._server.hosts) == "table" and next(self._server.hosts) ~= nil then | |
540 conn:sni(self._server.hosts, true); | |
541 end | |
542 end | |
486 self:on("starttls"); | 543 self:on("starttls"); |
487 self.ondrain = nil; | 544 self.ondrain = nil; |
488 self.onwritable = interface.tlshandskake; | 545 self.onwritable = interface.tlshandskake; |
489 self.onreadable = interface.tlshandskake; | 546 self.onreadable = interface.tlshandskake; |
490 return self:init(); | 547 return self:init(); |
491 end | 548 end |
492 local ok, err = self.conn:dohandshake(); | 549 local ok, err = self.conn:dohandshake(); |
493 if ok then | 550 if ok then |
494 log("debug", "TLS handshake on %s complete", self); | 551 local info = self.conn.info and self.conn:info(); |
552 if type(info) == "table" then | |
553 self:debug("TLS handshake complete (%s with %s)", info.protocol, info.cipher); | |
554 else | |
555 self:debug("TLS handshake complete"); | |
556 end | |
495 self.onwritable = nil; | 557 self.onwritable = nil; |
496 self.onreadable = nil; | 558 self.onreadable = nil; |
497 self:on("status", "ssl-handshake-complete"); | 559 self:on("status", "ssl-handshake-complete"); |
498 self:setwritetimeout(); | 560 self:setwritetimeout(); |
499 self:set(true, true); | 561 self:set(true, true); |
500 elseif err == "wantread" then | 562 elseif err == "wantread" then |
501 log("debug", "TLS handshake on %s to wait until readable", self); | 563 self:debug("TLS handshake to wait until readable"); |
502 self:set(true, false); | 564 self:set(true, false); |
503 self:setreadtimeout(cfg.ssl_handshake_timeout); | 565 self:setreadtimeout(cfg.ssl_handshake_timeout); |
504 elseif err == "wantwrite" then | 566 elseif err == "wantwrite" then |
505 log("debug", "TLS handshake on %s to wait until writable", self); | 567 self:debug("TLS handshake to wait until writable"); |
506 self:set(false, true); | 568 self:set(false, true); |
507 self:setwritetimeout(cfg.ssl_handshake_timeout); | 569 self:setwritetimeout(cfg.ssl_handshake_timeout); |
508 else | 570 else |
509 log("debug", "TLS handshake error on %s: %s", self, err); | 571 self:debug("TLS handshake error: %s", err); |
510 self:on("disconnect", err); | 572 self:on("disconnect", err); |
511 self:destroy(); | 573 self:destroy(); |
512 end | 574 end |
513 end | 575 end |
514 | 576 |
515 local function wrapsocket(client, server, read_size, listeners, tls_ctx) -- luasocket object -> interface object | 577 local function wrapsocket(client, server, read_size, listeners, tls_ctx, extra) -- luasocket object -> interface object |
516 client:settimeout(0); | 578 client:settimeout(0); |
579 local conn_id = ("conn%s"):format(new_id()); | |
517 local conn = setmetatable({ | 580 local conn = setmetatable({ |
518 conn = client; | 581 conn = client; |
519 _server = server; | 582 _server = server; |
520 created = gettime(); | 583 created = realtime(); |
521 listeners = listeners; | 584 listeners = listeners; |
522 read_size = read_size or (server and server.read_size); | 585 read_size = read_size or (server and server.read_size); |
523 writebuffer = {}; | 586 writebuffer = {}; |
524 tls_ctx = tls_ctx or (server and server.tls_ctx); | 587 tls_ctx = tls_ctx or (server and server.tls_ctx); |
525 tls_direct = server and server.tls_direct; | 588 tls_direct = server and server.tls_direct; |
589 id = conn_id; | |
590 log = logger.init(conn_id); | |
591 extra = extra; | |
526 }, interface_mt); | 592 }, interface_mt); |
593 | |
594 if extra then | |
595 if extra.servername then | |
596 conn.servername = extra.servername; | |
597 end | |
598 end | |
527 | 599 |
528 conn:updatenames(); | 600 conn:updatenames(); |
529 return conn; | 601 return conn; |
530 end | 602 end |
531 | 603 |
532 function interface:updatenames() | 604 function interface:updatenames() |
533 local conn = self.conn; | 605 local conn = self.conn; |
534 local ok, peername, peerport = pcall(conn.getpeername, conn); | 606 local ok, peername, peerport = pcall(conn.getpeername, conn); |
535 if ok then | 607 if ok and peername then |
536 self.peername, self.peerport = peername, peerport; | 608 self.peername, self.peerport = peername, peerport; |
537 end | 609 end |
538 local ok, sockname, sockport = pcall(conn.getsockname, conn); | 610 local ok, sockname, sockport = pcall(conn.getsockname, conn); |
539 if ok then | 611 if ok and sockname then |
540 self.sockname, self.sockport = sockname, sockport; | 612 self.sockname, self.sockport = sockname, sockport; |
541 end | 613 end |
542 end | 614 end |
543 | 615 |
544 -- A server interface has new incoming connections waiting | 616 -- A server interface has new incoming connections waiting |
545 -- This replaces the onreadable callback | 617 -- This replaces the onreadable callback |
546 function interface:onacceptable() | 618 function interface:onacceptable() |
547 local conn, err = self.conn:accept(); | 619 local conn, err = self.conn:accept(); |
548 if not conn then | 620 if not conn then |
549 log("debug", "Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval); | 621 self:debug("Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval); |
550 self:pausefor(cfg.accept_retry_interval); | 622 self:pausefor(cfg.accept_retry_interval); |
551 return; | 623 return; |
552 end | 624 end |
553 local client = wrapsocket(conn, self, nil, self.listeners); | 625 local client = wrapsocket(conn, self, nil, self.listeners); |
554 log("debug", "New connection %s", tostring(client)); | 626 client:debug("New connection %s on server %s", client, self); |
555 client:init(); | 627 client:init(); |
556 if self.tls_direct then | 628 if self.tls_direct then |
557 client:starttls(self.tls_ctx); | 629 client:starttls(self.tls_ctx); |
630 else | |
631 client:onconnect(); | |
558 end | 632 end |
559 end | 633 end |
560 | 634 |
561 -- Initialization | 635 -- Initialization |
562 function interface:init() | 636 function interface:init() |
563 self:setwritetimeout(); | 637 self:setwritetimeout(cfg.connect_timeout); |
564 return self:add(true, true); | 638 return self:add(true, true); |
565 end | 639 end |
566 | 640 |
567 function interface:pause() | 641 function interface:pause() |
642 self:debug("Pause reading"); | |
568 return self:set(false); | 643 return self:set(false); |
569 end | 644 end |
570 | 645 |
571 function interface:resume() | 646 function interface:resume() |
647 self:debug("Resume reading"); | |
572 return self:set(true); | 648 return self:set(true); |
573 end | 649 end |
574 | 650 |
575 -- Pause connection for some time | 651 -- Pause connection for some time |
576 function interface:pausefor(t) | 652 function interface:pausefor(t) |
653 self:debug("Pause for %fs", t); | |
577 if self._pausefor then | 654 if self._pausefor then |
578 self._pausefor:close(); | 655 self._pausefor:close(); |
579 end | 656 end |
580 if t == false then return; end | 657 if t == false then return; end |
581 self:set(false); | 658 self:set(false); |
582 self._pausefor = addtimer(t, function () | 659 self._pausefor = addtimer(t, function () |
583 self._pausefor = nil; | 660 self._pausefor = nil; |
584 self:set(true); | 661 self:set(true); |
662 self:debug("Resuming after pause, connection is %s", not self.conn and "missing" or self.conn:dirty() and "dirty" or "clean"); | |
585 if self.conn and self.conn:dirty() then | 663 if self.conn and self.conn:dirty() then |
586 self:onreadable(); | 664 self:onreadable(); |
587 end | 665 end |
588 end); | 666 end); |
589 end | 667 end |
590 | 668 |
669 function interface:setlimit(Bps) | |
670 if Bps > 0 then | |
671 self._limit = 1/Bps; | |
672 else | |
673 self._limit = nil; | |
674 end | |
675 end | |
676 | |
677 function interface:pause_writes() | |
678 if self._write_lock then | |
679 return | |
680 end | |
681 self:debug("Pause writes"); | |
682 self._write_lock = true; | |
683 self:setwritetimeout(false); | |
684 self:set(nil, false); | |
685 end | |
686 | |
687 function interface:resume_writes() | |
688 if not self._write_lock then | |
689 return | |
690 end | |
691 self:debug("Resume writes"); | |
692 self._write_lock = nil; | |
693 if self.writebuffer[1] then | |
694 self:setwritetimeout(); | |
695 self:set(nil, true); | |
696 end | |
697 end | |
698 | |
591 -- Connected! | 699 -- Connected! |
592 function interface:onconnect() | 700 function interface:onconnect() |
593 if self.conn and not self.peername and self.conn.getpeername then | 701 self:updatenames(); |
594 self.peername, self.peerport = self.conn:getpeername(); | 702 self:debug("Connected (%s)", self); |
595 end | |
596 self.onconnect = noop; | 703 self.onconnect = noop; |
597 self:on("connect"); | 704 self:on("connect"); |
598 end | 705 end |
599 | 706 |
600 local function addserver(addr, port, listeners, read_size, tls_ctx) | 707 local function listen(addr, port, listeners, config) |
601 local conn, err = socket.bind(addr, port, cfg.tcp_backlog); | 708 local conn, err = socket.bind(addr, port, cfg.tcp_backlog); |
602 if not conn then return conn, err; end | 709 if not conn then return conn, err; end |
603 conn:settimeout(0); | 710 conn:settimeout(0); |
604 local server = setmetatable({ | 711 local server = setmetatable({ |
605 conn = conn; | 712 conn = conn; |
606 created = gettime(); | 713 created = realtime(); |
607 listeners = listeners; | 714 listeners = listeners; |
715 read_size = config and config.read_size; | |
716 onreadable = interface.onacceptable; | |
717 tls_ctx = config and config.tls_ctx; | |
718 tls_direct = config and config.tls_direct; | |
719 hosts = config and config.sni_hosts; | |
720 sockname = addr; | |
721 sockport = port; | |
722 log = logger.init(("serv%s"):format(new_id())); | |
723 }, interface_mt); | |
724 server:debug("Server %s created", server); | |
725 server:add(true, false); | |
726 return server; | |
727 end | |
728 | |
729 -- COMPAT | |
730 local function addserver(addr, port, listeners, read_size, tls_ctx) | |
731 return listen(addr, port, listeners, { | |
608 read_size = read_size; | 732 read_size = read_size; |
609 onreadable = interface.onacceptable; | |
610 tls_ctx = tls_ctx; | 733 tls_ctx = tls_ctx; |
611 tls_direct = tls_ctx and true or false; | 734 tls_direct = tls_ctx and true or false; |
612 sockname = addr; | 735 }); |
613 sockport = port; | |
614 }, interface_mt); | |
615 server:add(true, false); | |
616 return server; | |
617 end | 736 end |
618 | 737 |
619 -- COMPAT | 738 -- COMPAT |
620 local function wrapclient(conn, addr, port, listeners, read_size, tls_ctx) | 739 local function wrapclient(conn, addr, port, listeners, read_size, tls_ctx, extra) |
621 local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx); | 740 local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx, extra); |
622 if not client.peername then | 741 if not client.peername then |
623 client.peername, client.peerport = addr, port; | 742 client.peername, client.peerport = addr, port; |
624 end | 743 end |
625 local ok, err = client:init(); | 744 local ok, err = client:init(); |
626 if not ok then return ok, err; end | 745 if not ok then return ok, err; end |
629 end | 748 end |
630 return client; | 749 return client; |
631 end | 750 end |
632 | 751 |
633 -- New outgoing TCP connection | 752 -- New outgoing TCP connection |
634 local function addclient(addr, port, listeners, read_size, tls_ctx, typ) | 753 local function addclient(addr, port, listeners, read_size, tls_ctx, typ, extra) |
635 local create; | 754 local create; |
636 if not typ then | 755 if not typ then |
637 local n = inet_pton(addr); | 756 local n = inet_pton(addr); |
638 if not n then return nil, "invalid-ip"; end | 757 if not n then return nil, "invalid-ip"; end |
639 if #n == 16 then | 758 if #n == 16 then |
647 end | 766 end |
648 if type(create) ~= "function" then | 767 if type(create) ~= "function" then |
649 return nil, "invalid socket type"; | 768 return nil, "invalid socket type"; |
650 end | 769 end |
651 local conn, err = create(); | 770 local conn, err = create(); |
771 if not conn then return conn, err; end | |
652 local ok, err = conn:settimeout(0); | 772 local ok, err = conn:settimeout(0); |
653 if not ok then return ok, err; end | 773 if not ok then return ok, err; end |
654 local ok, err = conn:setpeername(addr, port); | 774 local ok, err = conn:setpeername(addr, port); |
655 if not ok and err ~= "timeout" then return ok, err; end | 775 if not ok and err ~= "timeout" then return ok, err; end |
656 local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx) | 776 local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx, extra) |
657 local ok, err = client:init(); | 777 local ok, err = client:init(); |
778 if not client.peername then | |
779 -- otherwise not set until connected | |
780 client.peername, client.peerport = addr, port; | |
781 end | |
658 if not ok then return ok, err; end | 782 if not ok then return ok, err; end |
783 client:debug("Client %s created", client); | |
659 if tls_ctx then | 784 if tls_ctx then |
660 client:starttls(tls_ctx); | 785 client:starttls(tls_ctx); |
661 end | 786 end |
662 return client, conn; | 787 return client, conn; |
663 end | 788 end |
675 conn.getfd = function () | 800 conn.getfd = function () |
676 return fd; | 801 return fd; |
677 end; | 802 end; |
678 -- Otherwise it'll need to be something LuaSocket-compatible | 803 -- Otherwise it'll need to be something LuaSocket-compatible |
679 end | 804 end |
805 conn.id = new_id(); | |
806 conn.log = logger.init(("fdwatch%s"):format(conn.id)); | |
680 conn:add(onreadable, onwritable); | 807 conn:add(onreadable, onwritable); |
681 return conn; | 808 return conn; |
682 end; | 809 end; |
683 | 810 |
684 -- Dump all data from one connection into another | 811 -- Dump all data from one connection into another |
685 local function link(from, to) | 812 local function link(from, to, read_size) |
686 from.listeners = setmetatable({ | 813 from:debug("Linking to %s", to.id); |
687 onincoming = function (_, data) | 814 function from:onincoming(data) |
688 from:pause(); | 815 self:pause(); |
689 to:write(data); | 816 to:write(data); |
690 end, | 817 end |
691 }, {__index=from.listeners}); | 818 function to:ondrain() -- luacheck: ignore 212/self |
692 to.listeners = setmetatable({ | 819 from:resume(); |
693 ondrain = function () | 820 end |
694 from:resume(); | 821 from:set_mode(read_size); |
695 end, | |
696 }, {__index=to.listeners}); | |
697 from:set(true, nil); | 822 from:set(true, nil); |
698 to:set(nil, true); | 823 to:set(nil, true); |
699 end | 824 end |
700 | 825 |
701 -- COMPAT | 826 -- COMPAT |
750 return { | 875 return { |
751 get_backend = function () return "epoll"; end; | 876 get_backend = function () return "epoll"; end; |
752 addserver = addserver; | 877 addserver = addserver; |
753 addclient = addclient; | 878 addclient = addclient; |
754 add_task = addtimer; | 879 add_task = addtimer; |
755 at = at; | 880 listen = listen; |
756 loop = loop; | 881 loop = loop; |
757 closeall = closeall; | 882 closeall = closeall; |
758 setquitting = setquitting; | 883 setquitting = setquitting; |
759 wrapclient = wrapclient; | 884 wrapclient = wrapclient; |
760 watchfd = watchfd; | 885 watchfd = watchfd; |
764 end; | 889 end; |
765 | 890 |
766 -- libevent emulation | 891 -- libevent emulation |
767 event = { EV_READ = "r", EV_WRITE = "w", EV_READWRITE = "rw", EV_LEAVE = -1 }; | 892 event = { EV_READ = "r", EV_WRITE = "w", EV_READWRITE = "rw", EV_LEAVE = -1 }; |
768 addevent = function (fd, mode, callback) | 893 addevent = function (fd, mode, callback) |
894 log("warn", "Using deprecated libevent emulation, please update code to use watchfd API instead"); | |
769 local function onevent(self) | 895 local function onevent(self) |
770 local ret = self:callback(); | 896 local ret = self:callback(); |
771 if ret == -1 then | 897 if ret == -1 then |
772 self:set(false, false); | 898 self:set(false, false); |
773 elseif ret then | 899 elseif ret then |
783 close = function (self) | 909 close = function (self) |
784 self:del(); | 910 self:del(); |
785 fds[fd] = nil; | 911 fds[fd] = nil; |
786 end; | 912 end; |
787 }, interface_mt); | 913 }, interface_mt); |
914 conn.id = conn:getfd(); | |
915 conn.log = logger.init(("fdwatch%d"):format(conn.id)); | |
788 local ok, err = conn:add(mode == "r" or mode == "rw", mode == "w" or mode == "rw"); | 916 local ok, err = conn:add(mode == "r" or mode == "rw", mode == "w" or mode == "rw"); |
789 if not ok then return ok, err; end | 917 if not ok then return ok, err; end |
790 return conn; | 918 return conn; |
791 end; | 919 end; |
792 }; | 920 }; |