Comparison

net/server_epoll.lua @ 10563:e8db377a2983

Merge 0.11->trunk
author Kim Alvefur <zash@zash.se>
date Tue, 24 Dec 2019 00:39:45 +0100
parent 10546:944863f878b9
child 10571:cfeb0077c9e9
comparison
equal deleted inserted replaced
10562:670afc079f68 10563:e8db377a2983
7 7
8 8
9 local t_insert = table.insert; 9 local t_insert = table.insert;
10 local t_concat = table.concat; 10 local t_concat = table.concat;
11 local setmetatable = setmetatable; 11 local setmetatable = setmetatable;
12 local tostring = tostring;
13 local pcall = pcall; 12 local pcall = pcall;
14 local type = type; 13 local type = type;
15 local next = next; 14 local next = next;
16 local pairs = pairs; 15 local pairs = pairs;
17 local log = require "util.logger".init("server_epoll"); 16 local logger = require "util.logger";
17 local log = logger.init("server_epoll");
18 local socket = require "socket"; 18 local socket = require "socket";
19 local luasec = require "ssl"; 19 local luasec = require "ssl";
20 local gettime = require "util.time".now; 20 local realtime = require "util.time".now;
21 local monotonic = require "util.time".monotonic;
21 local indexedbheap = require "util.indexedbheap"; 22 local indexedbheap = require "util.indexedbheap";
22 local createtable = require "util.table".create; 23 local createtable = require "util.table".create;
23 local inet = require "util.net"; 24 local inet = require "util.net";
24 local inet_pton = inet.pton; 25 local inet_pton = inet.pton;
25 local _SOCKETINVALID = socket._SOCKETINVALID or -1; 26 local _SOCKETINVALID = socket._SOCKETINVALID or -1;
27 local new_id = require "util.id".medium;
26 28
27 local poller = require "util.poll" 29 local poller = require "util.poll"
28 local EEXIST = poller.EEXIST; 30 local EEXIST = poller.EEXIST;
29 local ENOENT = poller.ENOENT; 31 local ENOENT = poller.ENOENT;
30 32
36 local default_config = { __index = { 38 local default_config = { __index = {
37 -- If a connection is silent for this long, close it unless onreadtimeout says not to 39 -- If a connection is silent for this long, close it unless onreadtimeout says not to
38 read_timeout = 14 * 60; 40 read_timeout = 14 * 60;
39 41
40 -- How long to wait for a socket to become writable after queuing data to send 42 -- How long to wait for a socket to become writable after queuing data to send
41 send_timeout = 60; 43 send_timeout = 180;
44
45 -- How long to wait for a socket to become writable after creation
46 connect_timeout = 20;
42 47
43 -- Some number possibly influencing how many pending connections can be accepted 48 -- Some number possibly influencing how many pending connections can be accepted
44 tcp_backlog = 128; 49 tcp_backlog = 128;
45 50
46 -- If accepting a new incoming connection fails, wait this long before trying again 51 -- If accepting a new incoming connection fails, wait this long before trying again
56 ssl_handshake_timeout = 60; 61 ssl_handshake_timeout = 60;
57 62
58 -- Maximum and minimum amount of time to sleep waiting for events (adjusted for pending timers) 63 -- Maximum and minimum amount of time to sleep waiting for events (adjusted for pending timers)
59 max_wait = 86400; 64 max_wait = 86400;
60 min_wait = 1e-06; 65 min_wait = 1e-06;
66
67 -- EXPERIMENTAL
68 -- Whether to kill connections in case of callback errors.
69 fatal_errors = false;
70
71 -- Attempt writes instantly
72 opportunistic_writes = false;
61 }}; 73 }};
62 local cfg = default_config.__index; 74 local cfg = default_config.__index;
63 75
64 local fds = createtable(10, 0); -- FD -> conn 76 local fds = createtable(10, 0); -- FD -> conn
65 77
73 t[2] = noop; 85 t[2] = noop;
74 timers:remove(t.id); 86 timers:remove(t.id);
75 end 87 end
76 88
77 local function reschedule(t, time) 89 local function reschedule(t, time)
90 time = monotonic() + time;
78 t[1] = time; 91 t[1] = time;
79 timers:reprioritize(t.id, time); 92 timers:reprioritize(t.id, time);
80 end 93 end
81 94
82 -- Add absolute timer 95 -- Add relative timer
83 local function at(time, f) 96 local function addtimer(timeout, f)
97 local time = monotonic() + timeout;
84 local timer = { time, f, close = closetimer, reschedule = reschedule, id = nil }; 98 local timer = { time, f, close = closetimer, reschedule = reschedule, id = nil };
85 timer.id = timers:insert(timer, time); 99 timer.id = timers:insert(timer, time);
86 return timer; 100 return timer;
87 end
88
89 -- Add relative timer
90 local function addtimer(timeout, f)
91 return at(gettime() + timeout, f);
92 end 101 end
93 102
94 -- Run callbacks of expired timers 103 -- Run callbacks of expired timers
95 -- Return time until next timeout 104 -- Return time until next timeout
96 local function runtimers(next_delay, min_wait) 105 local function runtimers(next_delay, min_wait)
97 -- Any timers at all? 106 -- Any timers at all?
98 local now = gettime(); 107 local elapsed = monotonic();
108 local now = realtime();
99 local peek = timers:peek(); 109 local peek = timers:peek();
100 while peek do 110 while peek do
101 111
102 if peek > now then 112 if peek > elapsed then
103 next_delay = peek - now; 113 next_delay = peek - elapsed;
104 break; 114 break;
105 end 115 end
106 116
107 local _, timer, id = timers:pop(); 117 local _, timer = timers:pop();
108 local ok, ret = pcall(timer[2], now); 118 local ok, ret = pcall(timer[2], now);
109 if ok and type(ret) == "number" then 119 if ok and type(ret) == "number" then
110 local next_time = now+ret; 120 local next_time = elapsed+ret;
111 timer[1] = next_time; 121 timer[1] = next_time;
112 timers:insert(timer, next_time); 122 timers:insert(timer, next_time);
113 end 123 end
114 124
115 peek = timers:peek(); 125 peek = timers:peek();
116 end 126 end
117 if peek == nil then 127 if peek == nil then
118 return next_delay; 128 return next_delay;
119 end 129 end
120 130
121 if next_delay < min_wait then 131 if next_delay < min_wait then
136 return ("FD %d (%s, %d)"):format(self:getfd(), self.sockname or self.peername, self.sockport or self.peerport); 146 return ("FD %d (%s, %d)"):format(self:getfd(), self.sockname or self.peername, self.sockport or self.peerport);
137 end 147 end
138 return ("FD %d"):format(self:getfd()); 148 return ("FD %d"):format(self:getfd());
139 end 149 end
140 150
151 interface.log = log;
152 function interface:debug(msg, ...) --luacheck: ignore 212/self
153 self.log("debug", msg, ...);
154 end
155
156 function interface:error(msg, ...) --luacheck: ignore 212/self
157 self.log("error", msg, ...);
158 end
159
141 -- Replace the listener and tell the old one 160 -- Replace the listener and tell the old one
142 function interface:setlistener(listeners, data) 161 function interface:setlistener(listeners, data)
143 self:on("detach"); 162 self:on("detach");
144 self.listeners = listeners; 163 self.listeners = listeners;
145 self:on("attach", data); 164 self:on("attach", data);
146 end 165 end
147 166
148 -- Call a listener callback 167 -- Call a listener callback
149 function interface:on(what, ...) 168 function interface:on(what, ...)
150 if not self.listeners then 169 if not self.listeners then
151 log("error", "%s has no listeners", self); 170 self:debug("Interface is missing listener callbacks");
152 return; 171 return;
153 end 172 end
154 local listener = self.listeners["on"..what]; 173 local listener = self.listeners["on"..what];
155 if not listener then 174 if not listener then
156 -- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging 175 -- self:debug("Missing listener 'on%s'", what); -- uncomment for development and debugging
157 return; 176 return;
158 end 177 end
159 local ok, err = pcall(listener, self, ...); 178 local ok, err = pcall(listener, self, ...);
160 if not ok then 179 if not ok then
161 log("error", "Error calling on%s: %s", what, err); 180 if cfg.fatal_errors then
181 self:debug("Closing due to error calling on%s: %s", what, err);
182 self:destroy();
183 else
184 self:debug("Error calling on%s: %s", what, err);
185 end
186 return nil, err;
162 end 187 end
163 return err; 188 return err;
189 end
190
191 -- Allow this one to be overridden
192 function interface:onincoming(...)
193 return self:on("incoming", ...);
164 end 194 end
165 195
166 -- Return the file descriptor number 196 -- Return the file descriptor number
167 function interface:getfd() 197 function interface:getfd()
168 if self.conn then 198 if self.conn then
224 end 254 end
225 return 255 return
226 end 256 end
227 t = t or cfg.read_timeout; 257 t = t or cfg.read_timeout;
228 if self._readtimeout then 258 if self._readtimeout then
229 self._readtimeout:reschedule(gettime() + t); 259 self._readtimeout:reschedule(t);
230 else 260 else
231 self._readtimeout = addtimer(t, function () 261 self._readtimeout = addtimer(t, function ()
232 if self:on("readtimeout") then 262 if self:on("readtimeout") then
263 self:debug("Read timeout handled");
233 return cfg.read_timeout; 264 return cfg.read_timeout;
234 else 265 else
266 self:debug("Read timeout not handled, disconnecting");
235 self:on("disconnect", "read timeout"); 267 self:on("disconnect", "read timeout");
236 self:destroy(); 268 self:destroy();
237 end 269 end
238 end); 270 end);
239 end 271 end
248 end 280 end
249 return 281 return
250 end 282 end
251 t = t or cfg.send_timeout; 283 t = t or cfg.send_timeout;
252 if self._writetimeout then 284 if self._writetimeout then
253 self._writetimeout:reschedule(gettime() + t); 285 self._writetimeout:reschedule(t);
254 else 286 else
255 self._writetimeout = addtimer(t, function () 287 self._writetimeout = addtimer(t, function ()
288 self:debug("Write timeout");
256 self:on("disconnect", "write timeout"); 289 self:on("disconnect", "write timeout");
257 self:destroy(); 290 self:destroy();
258 end); 291 end);
259 end 292 end
260 end 293 end
267 if r == nil then r = self._wantread; end 300 if r == nil then r = self._wantread; end
268 if w == nil then w = self._wantwrite; end 301 if w == nil then w = self._wantwrite; end
269 local ok, err, errno = poll:add(fd, r, w); 302 local ok, err, errno = poll:add(fd, r, w);
270 if not ok then 303 if not ok then
271 if errno == EEXIST then 304 if errno == EEXIST then
272 log("debug", "%s already registered!", self); 305 self:debug("FD already registered in poller! (EEXIST)");
273 return self:set(r, w); -- So try to change its flags 306 return self:set(r, w); -- So try to change its flags
274 end 307 end
275 log("error", "Could not register %s: %s(%d)", self, err, errno); 308 self:debug("Could not register in poller: %s(%d)", err, errno);
276 return ok, err; 309 return ok, err;
277 end 310 end
278 self._wantread, self._wantwrite = r, w; 311 self._wantread, self._wantwrite = r, w;
279 fds[fd] = self; 312 fds[fd] = self;
280 log("debug", "Watching %s", self); 313 self:debug("Registered in poller");
281 return true; 314 return true;
282 end 315 end
283 316
284 function interface:set(r, w) 317 function interface:set(r, w)
285 local fd = self:getfd(); 318 local fd = self:getfd();
288 end 321 end
289 if r == nil then r = self._wantread; end 322 if r == nil then r = self._wantread; end
290 if w == nil then w = self._wantwrite; end 323 if w == nil then w = self._wantwrite; end
291 local ok, err, errno = poll:set(fd, r, w); 324 local ok, err, errno = poll:set(fd, r, w);
292 if not ok then 325 if not ok then
293 log("error", "Could not update poller state %s: %s(%d)", self, err, errno); 326 self:debug("Could not update poller state: %s(%d)", err, errno);
294 return ok, err; 327 return ok, err;
295 end 328 end
296 self._wantread, self._wantwrite = r, w; 329 self._wantread, self._wantwrite = r, w;
297 return true; 330 return true;
298 end 331 end
305 if fds[fd] ~= self then 338 if fds[fd] ~= self then
306 return nil, "unregistered fd"; 339 return nil, "unregistered fd";
307 end 340 end
308 local ok, err, errno = poll:del(fd); 341 local ok, err, errno = poll:del(fd);
309 if not ok and errno ~= ENOENT then 342 if not ok and errno ~= ENOENT then
310 log("error", "Could not unregister %s: %s(%d)", self, err, errno); 343 self:debug("Could not unregister: %s(%d)", err, errno);
311 return ok, err; 344 return ok, err;
312 end 345 end
313 self._wantread, self._wantwrite = nil, nil; 346 self._wantread, self._wantwrite = nil, nil;
314 fds[fd] = nil; 347 fds[fd] = nil;
315 log("debug", "Unwatched %s", self); 348 self:debug("Unregistered from poller");
316 return true; 349 return true;
317 end 350 end
318 351
319 function interface:setflags(r, w) 352 function interface:setflags(r, w)
320 if not(self._wantread or self._wantwrite) then 353 if not(self._wantread or self._wantwrite) then
332 -- Called when socket is readable 365 -- Called when socket is readable
333 function interface:onreadable() 366 function interface:onreadable()
334 local data, err, partial = self.conn:receive(self.read_size or cfg.read_size); 367 local data, err, partial = self.conn:receive(self.read_size or cfg.read_size);
335 if data then 368 if data then
336 self:onconnect(); 369 self:onconnect();
337 self:on("incoming", data); 370 self:onincoming(data);
338 else 371 else
339 if err == "wantread" then 372 if err == "wantread" then
340 self:set(true, nil); 373 self:set(true, nil);
341 err = "timeout"; 374 err = "timeout";
342 elseif err == "wantwrite" then 375 elseif err == "wantwrite" then
343 self:set(nil, true); 376 self:set(nil, true);
344 err = "timeout"; 377 err = "timeout";
345 end 378 end
346 if partial and partial ~= "" then 379 if partial and partial ~= "" then
347 self:onconnect(); 380 self:onconnect();
348 self:on("incoming", partial, err); 381 self:onincoming(partial, err);
349 end 382 end
350 if err ~= "timeout" then 383 if err ~= "timeout" then
351 self:on("disconnect", err); 384 self:on("disconnect", err);
352 self:destroy() 385 self:destroy()
353 return; 386 return;
354 end 387 end
355 end 388 end
356 if not self.conn then return; end 389 if not self.conn then return; end
390 if self._limit and (data or partial) then
391 local cost = self._limit * #(data or partial);
392 if cost > cfg.min_wait then
393 self:setreadtimeout(false);
394 self:pausefor(cost);
395 return;
396 end
397 end
357 if self._wantread and self.conn:dirty() then 398 if self._wantread and self.conn:dirty() then
358 self:setreadtimeout(false); 399 self:setreadtimeout(false);
359 self:pausefor(cfg.read_retry_delay); 400 self:pausefor(cfg.read_retry_delay);
360 else 401 else
361 self:setreadtimeout(); 402 self:setreadtimeout();
376 end 417 end
377 self:setwritetimeout(false); 418 self:setwritetimeout(false);
378 self:ondrain(); -- Be aware of writes in ondrain 419 self:ondrain(); -- Be aware of writes in ondrain
379 return; 420 return;
380 elseif partial then 421 elseif partial then
422 self:debug("Sent %d out of %d buffered bytes", partial, #data);
381 buffer[1] = data:sub(partial+1); 423 buffer[1] = data:sub(partial+1);
382 for i = #buffer, 2, -1 do 424 for i = #buffer, 2, -1 do
383 buffer[i] = nil; 425 buffer[i] = nil;
384 end 426 end
427 self:set(nil, true);
385 self:setwritetimeout(); 428 self:setwritetimeout();
386 end 429 end
387 if err == "wantwrite" or err == "timeout" then 430 if err == "wantwrite" or err == "timeout" then
388 self:set(nil, true); 431 self:set(nil, true);
389 elseif err == "wantread" then 432 elseif err == "wantread" then
405 if buffer then 448 if buffer then
406 t_insert(buffer, data); 449 t_insert(buffer, data);
407 else 450 else
408 self.writebuffer = { data }; 451 self.writebuffer = { data };
409 end 452 end
410 self:setwritetimeout(); 453 if not self._write_lock then
411 self:set(nil, true); 454 if cfg.opportunistic_writes then
455 self:onwritable();
456 return #data;
457 end
458 self:setwritetimeout();
459 self:set(nil, true);
460 end
412 return #data; 461 return #data;
413 end 462 end
414 interface.send = interface.write; 463 interface.send = interface.write;
415 464
416 -- Close, possibly after writing is done 465 -- Close, possibly after writing is done
417 function interface:close() 466 function interface:close()
418 if self.writebuffer and self.writebuffer[1] then 467 if self.writebuffer and self.writebuffer[1] then
419 self:set(false, true); -- Flush final buffer contents 468 self:set(false, true); -- Flush final buffer contents
420 self.write, self.send = noop, noop; -- No more writing 469 self.write, self.send = noop, noop; -- No more writing
421 log("debug", "Close %s after writing", self); 470 self:debug("Close after writing remaining buffered data");
422 self.ondrain = interface.close; 471 self.ondrain = interface.close;
423 else 472 else
424 log("debug", "Close %s now", self); 473 self:debug("Closing now");
425 self.write, self.send = noop, noop; 474 self.write, self.send = noop, noop;
426 self.close = noop; 475 self.close = noop;
427 self:on("disconnect"); 476 self:on("disconnect");
428 self:destroy(); 477 self:destroy();
429 end 478 end
448 497
449 function interface:starttls(tls_ctx) 498 function interface:starttls(tls_ctx)
450 if tls_ctx then self.tls_ctx = tls_ctx; end 499 if tls_ctx then self.tls_ctx = tls_ctx; end
451 self.starttls = false; 500 self.starttls = false;
452 if self.writebuffer and self.writebuffer[1] then 501 if self.writebuffer and self.writebuffer[1] then
453 log("debug", "Start TLS on %s after write", self); 502 self:debug("Start TLS after write");
454 self.ondrain = interface.starttls; 503 self.ondrain = interface.starttls;
455 self:set(nil, true); -- make sure wantwrite is set 504 self:set(nil, true); -- make sure wantwrite is set
456 else 505 else
457 if self.ondrain == interface.starttls then 506 if self.ondrain == interface.starttls then
458 self.ondrain = nil; 507 self.ondrain = nil;
459 end 508 end
460 self.onwritable = interface.tlshandskake; 509 self.onwritable = interface.tlshandskake;
461 self.onreadable = interface.tlshandskake; 510 self.onreadable = interface.tlshandskake;
462 self:set(true, true); 511 self:set(true, true);
463 log("debug", "Prepare to start TLS on %s", self); 512 self:debug("Prepared to start TLS");
464 end 513 end
465 end 514 end
466 515
467 function interface:tlshandskake() 516 function interface:tlshandskake()
468 self:setwritetimeout(false); 517 self:setwritetimeout(false);
469 self:setreadtimeout(false); 518 self:setreadtimeout(false);
470 if not self._tls then 519 if not self._tls then
471 self._tls = true; 520 self._tls = true;
472 log("debug", "Start TLS on %s now", self); 521 self:debug("Starting TLS now");
473 self:del(); 522 self:del();
523 self:updatenames(); -- Can't getpeer/sockname after wrap()
474 local ok, conn, err = pcall(luasec.wrap, self.conn, self.tls_ctx); 524 local ok, conn, err = pcall(luasec.wrap, self.conn, self.tls_ctx);
475 if not ok then 525 if not ok then
476 conn, err = ok, conn; 526 conn, err = ok, conn;
477 log("error", "Failed to initialize TLS: %s", err); 527 self:debug("Failed to initialize TLS: %s", err);
478 end 528 end
479 if not conn then 529 if not conn then
480 self:on("disconnect", err); 530 self:on("disconnect", err);
481 self:destroy(); 531 self:destroy();
482 return conn, err; 532 return conn, err;
483 end 533 end
484 conn:settimeout(0); 534 conn:settimeout(0);
485 self.conn = conn; 535 self.conn = conn;
536 if conn.sni then
537 if self.servername then
538 conn:sni(self.servername);
539 elseif self._server and type(self._server.hosts) == "table" and next(self._server.hosts) ~= nil then
540 conn:sni(self._server.hosts, true);
541 end
542 end
486 self:on("starttls"); 543 self:on("starttls");
487 self.ondrain = nil; 544 self.ondrain = nil;
488 self.onwritable = interface.tlshandskake; 545 self.onwritable = interface.tlshandskake;
489 self.onreadable = interface.tlshandskake; 546 self.onreadable = interface.tlshandskake;
490 return self:init(); 547 return self:init();
491 end 548 end
492 local ok, err = self.conn:dohandshake(); 549 local ok, err = self.conn:dohandshake();
493 if ok then 550 if ok then
494 log("debug", "TLS handshake on %s complete", self); 551 local info = self.conn.info and self.conn:info();
552 if type(info) == "table" then
553 self:debug("TLS handshake complete (%s with %s)", info.protocol, info.cipher);
554 else
555 self:debug("TLS handshake complete");
556 end
495 self.onwritable = nil; 557 self.onwritable = nil;
496 self.onreadable = nil; 558 self.onreadable = nil;
497 self:on("status", "ssl-handshake-complete"); 559 self:on("status", "ssl-handshake-complete");
498 self:setwritetimeout(); 560 self:setwritetimeout();
499 self:set(true, true); 561 self:set(true, true);
500 elseif err == "wantread" then 562 elseif err == "wantread" then
501 log("debug", "TLS handshake on %s to wait until readable", self); 563 self:debug("TLS handshake to wait until readable");
502 self:set(true, false); 564 self:set(true, false);
503 self:setreadtimeout(cfg.ssl_handshake_timeout); 565 self:setreadtimeout(cfg.ssl_handshake_timeout);
504 elseif err == "wantwrite" then 566 elseif err == "wantwrite" then
505 log("debug", "TLS handshake on %s to wait until writable", self); 567 self:debug("TLS handshake to wait until writable");
506 self:set(false, true); 568 self:set(false, true);
507 self:setwritetimeout(cfg.ssl_handshake_timeout); 569 self:setwritetimeout(cfg.ssl_handshake_timeout);
508 else 570 else
509 log("debug", "TLS handshake error on %s: %s", self, err); 571 self:debug("TLS handshake error: %s", err);
510 self:on("disconnect", err); 572 self:on("disconnect", err);
511 self:destroy(); 573 self:destroy();
512 end 574 end
513 end 575 end
514 576
515 local function wrapsocket(client, server, read_size, listeners, tls_ctx) -- luasocket object -> interface object 577 local function wrapsocket(client, server, read_size, listeners, tls_ctx, extra) -- luasocket object -> interface object
516 client:settimeout(0); 578 client:settimeout(0);
579 local conn_id = ("conn%s"):format(new_id());
517 local conn = setmetatable({ 580 local conn = setmetatable({
518 conn = client; 581 conn = client;
519 _server = server; 582 _server = server;
520 created = gettime(); 583 created = realtime();
521 listeners = listeners; 584 listeners = listeners;
522 read_size = read_size or (server and server.read_size); 585 read_size = read_size or (server and server.read_size);
523 writebuffer = {}; 586 writebuffer = {};
524 tls_ctx = tls_ctx or (server and server.tls_ctx); 587 tls_ctx = tls_ctx or (server and server.tls_ctx);
525 tls_direct = server and server.tls_direct; 588 tls_direct = server and server.tls_direct;
589 id = conn_id;
590 log = logger.init(conn_id);
591 extra = extra;
526 }, interface_mt); 592 }, interface_mt);
593
594 if extra then
595 if extra.servername then
596 conn.servername = extra.servername;
597 end
598 end
527 599
528 conn:updatenames(); 600 conn:updatenames();
529 return conn; 601 return conn;
530 end 602 end
531 603
532 function interface:updatenames() 604 function interface:updatenames()
533 local conn = self.conn; 605 local conn = self.conn;
534 local ok, peername, peerport = pcall(conn.getpeername, conn); 606 local ok, peername, peerport = pcall(conn.getpeername, conn);
535 if ok then 607 if ok and peername then
536 self.peername, self.peerport = peername, peerport; 608 self.peername, self.peerport = peername, peerport;
537 end 609 end
538 local ok, sockname, sockport = pcall(conn.getsockname, conn); 610 local ok, sockname, sockport = pcall(conn.getsockname, conn);
539 if ok then 611 if ok and sockname then
540 self.sockname, self.sockport = sockname, sockport; 612 self.sockname, self.sockport = sockname, sockport;
541 end 613 end
542 end 614 end
543 615
544 -- A server interface has new incoming connections waiting 616 -- A server interface has new incoming connections waiting
545 -- This replaces the onreadable callback 617 -- This replaces the onreadable callback
546 function interface:onacceptable() 618 function interface:onacceptable()
547 local conn, err = self.conn:accept(); 619 local conn, err = self.conn:accept();
548 if not conn then 620 if not conn then
549 log("debug", "Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval); 621 self:debug("Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval);
550 self:pausefor(cfg.accept_retry_interval); 622 self:pausefor(cfg.accept_retry_interval);
551 return; 623 return;
552 end 624 end
553 local client = wrapsocket(conn, self, nil, self.listeners); 625 local client = wrapsocket(conn, self, nil, self.listeners);
554 log("debug", "New connection %s", tostring(client)); 626 client:debug("New connection %s on server %s", client, self);
555 client:init(); 627 client:init();
556 if self.tls_direct then 628 if self.tls_direct then
557 client:starttls(self.tls_ctx); 629 client:starttls(self.tls_ctx);
630 else
631 client:onconnect();
558 end 632 end
559 end 633 end
560 634
561 -- Initialization 635 -- Initialization
562 function interface:init() 636 function interface:init()
563 self:setwritetimeout(); 637 self:setwritetimeout(cfg.connect_timeout);
564 return self:add(true, true); 638 return self:add(true, true);
565 end 639 end
566 640
567 function interface:pause() 641 function interface:pause()
642 self:debug("Pause reading");
568 return self:set(false); 643 return self:set(false);
569 end 644 end
570 645
571 function interface:resume() 646 function interface:resume()
647 self:debug("Resume reading");
572 return self:set(true); 648 return self:set(true);
573 end 649 end
574 650
575 -- Pause connection for some time 651 -- Pause connection for some time
576 function interface:pausefor(t) 652 function interface:pausefor(t)
653 self:debug("Pause for %fs", t);
577 if self._pausefor then 654 if self._pausefor then
578 self._pausefor:close(); 655 self._pausefor:close();
579 end 656 end
580 if t == false then return; end 657 if t == false then return; end
581 self:set(false); 658 self:set(false);
582 self._pausefor = addtimer(t, function () 659 self._pausefor = addtimer(t, function ()
583 self._pausefor = nil; 660 self._pausefor = nil;
584 self:set(true); 661 self:set(true);
662 self:debug("Resuming after pause, connection is %s", not self.conn and "missing" or self.conn:dirty() and "dirty" or "clean");
585 if self.conn and self.conn:dirty() then 663 if self.conn and self.conn:dirty() then
586 self:onreadable(); 664 self:onreadable();
587 end 665 end
588 end); 666 end);
589 end 667 end
590 668
669 function interface:setlimit(Bps)
670 if Bps > 0 then
671 self._limit = 1/Bps;
672 else
673 self._limit = nil;
674 end
675 end
676
677 function interface:pause_writes()
678 if self._write_lock then
679 return
680 end
681 self:debug("Pause writes");
682 self._write_lock = true;
683 self:setwritetimeout(false);
684 self:set(nil, false);
685 end
686
687 function interface:resume_writes()
688 if not self._write_lock then
689 return
690 end
691 self:debug("Resume writes");
692 self._write_lock = nil;
693 if self.writebuffer[1] then
694 self:setwritetimeout();
695 self:set(nil, true);
696 end
697 end
698
591 -- Connected! 699 -- Connected!
592 function interface:onconnect() 700 function interface:onconnect()
593 if self.conn and not self.peername and self.conn.getpeername then 701 self:updatenames();
594 self.peername, self.peerport = self.conn:getpeername(); 702 self:debug("Connected (%s)", self);
595 end
596 self.onconnect = noop; 703 self.onconnect = noop;
597 self:on("connect"); 704 self:on("connect");
598 end 705 end
599 706
600 local function addserver(addr, port, listeners, read_size, tls_ctx) 707 local function listen(addr, port, listeners, config)
601 local conn, err = socket.bind(addr, port, cfg.tcp_backlog); 708 local conn, err = socket.bind(addr, port, cfg.tcp_backlog);
602 if not conn then return conn, err; end 709 if not conn then return conn, err; end
603 conn:settimeout(0); 710 conn:settimeout(0);
604 local server = setmetatable({ 711 local server = setmetatable({
605 conn = conn; 712 conn = conn;
606 created = gettime(); 713 created = realtime();
607 listeners = listeners; 714 listeners = listeners;
715 read_size = config and config.read_size;
716 onreadable = interface.onacceptable;
717 tls_ctx = config and config.tls_ctx;
718 tls_direct = config and config.tls_direct;
719 hosts = config and config.sni_hosts;
720 sockname = addr;
721 sockport = port;
722 log = logger.init(("serv%s"):format(new_id()));
723 }, interface_mt);
724 server:debug("Server %s created", server);
725 server:add(true, false);
726 return server;
727 end
728
729 -- COMPAT
730 local function addserver(addr, port, listeners, read_size, tls_ctx)
731 return listen(addr, port, listeners, {
608 read_size = read_size; 732 read_size = read_size;
609 onreadable = interface.onacceptable;
610 tls_ctx = tls_ctx; 733 tls_ctx = tls_ctx;
611 tls_direct = tls_ctx and true or false; 734 tls_direct = tls_ctx and true or false;
612 sockname = addr; 735 });
613 sockport = port;
614 }, interface_mt);
615 server:add(true, false);
616 return server;
617 end 736 end
618 737
619 -- COMPAT 738 -- COMPAT
620 local function wrapclient(conn, addr, port, listeners, read_size, tls_ctx) 739 local function wrapclient(conn, addr, port, listeners, read_size, tls_ctx, extra)
621 local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx); 740 local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx, extra);
622 if not client.peername then 741 if not client.peername then
623 client.peername, client.peerport = addr, port; 742 client.peername, client.peerport = addr, port;
624 end 743 end
625 local ok, err = client:init(); 744 local ok, err = client:init();
626 if not ok then return ok, err; end 745 if not ok then return ok, err; end
629 end 748 end
630 return client; 749 return client;
631 end 750 end
632 751
633 -- New outgoing TCP connection 752 -- New outgoing TCP connection
634 local function addclient(addr, port, listeners, read_size, tls_ctx, typ) 753 local function addclient(addr, port, listeners, read_size, tls_ctx, typ, extra)
635 local create; 754 local create;
636 if not typ then 755 if not typ then
637 local n = inet_pton(addr); 756 local n = inet_pton(addr);
638 if not n then return nil, "invalid-ip"; end 757 if not n then return nil, "invalid-ip"; end
639 if #n == 16 then 758 if #n == 16 then
647 end 766 end
648 if type(create) ~= "function" then 767 if type(create) ~= "function" then
649 return nil, "invalid socket type"; 768 return nil, "invalid socket type";
650 end 769 end
651 local conn, err = create(); 770 local conn, err = create();
771 if not conn then return conn, err; end
652 local ok, err = conn:settimeout(0); 772 local ok, err = conn:settimeout(0);
653 if not ok then return ok, err; end 773 if not ok then return ok, err; end
654 local ok, err = conn:setpeername(addr, port); 774 local ok, err = conn:setpeername(addr, port);
655 if not ok and err ~= "timeout" then return ok, err; end 775 if not ok and err ~= "timeout" then return ok, err; end
656 local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx) 776 local client = wrapsocket(conn, nil, read_size, listeners, tls_ctx, extra)
657 local ok, err = client:init(); 777 local ok, err = client:init();
778 if not client.peername then
779 -- otherwise not set until connected
780 client.peername, client.peerport = addr, port;
781 end
658 if not ok then return ok, err; end 782 if not ok then return ok, err; end
783 client:debug("Client %s created", client);
659 if tls_ctx then 784 if tls_ctx then
660 client:starttls(tls_ctx); 785 client:starttls(tls_ctx);
661 end 786 end
662 return client, conn; 787 return client, conn;
663 end 788 end
675 conn.getfd = function () 800 conn.getfd = function ()
676 return fd; 801 return fd;
677 end; 802 end;
678 -- Otherwise it'll need to be something LuaSocket-compatible 803 -- Otherwise it'll need to be something LuaSocket-compatible
679 end 804 end
805 conn.id = new_id();
806 conn.log = logger.init(("fdwatch%s"):format(conn.id));
680 conn:add(onreadable, onwritable); 807 conn:add(onreadable, onwritable);
681 return conn; 808 return conn;
682 end; 809 end;
683 810
684 -- Dump all data from one connection into another 811 -- Dump all data from one connection into another
685 local function link(from, to) 812 local function link(from, to, read_size)
686 from.listeners = setmetatable({ 813 from:debug("Linking to %s", to.id);
687 onincoming = function (_, data) 814 function from:onincoming(data)
688 from:pause(); 815 self:pause();
689 to:write(data); 816 to:write(data);
690 end, 817 end
691 }, {__index=from.listeners}); 818 function to:ondrain() -- luacheck: ignore 212/self
692 to.listeners = setmetatable({ 819 from:resume();
693 ondrain = function () 820 end
694 from:resume(); 821 from:set_mode(read_size);
695 end,
696 }, {__index=to.listeners});
697 from:set(true, nil); 822 from:set(true, nil);
698 to:set(nil, true); 823 to:set(nil, true);
699 end 824 end
700 825
701 -- COMPAT 826 -- COMPAT
750 return { 875 return {
751 get_backend = function () return "epoll"; end; 876 get_backend = function () return "epoll"; end;
752 addserver = addserver; 877 addserver = addserver;
753 addclient = addclient; 878 addclient = addclient;
754 add_task = addtimer; 879 add_task = addtimer;
755 at = at; 880 listen = listen;
756 loop = loop; 881 loop = loop;
757 closeall = closeall; 882 closeall = closeall;
758 setquitting = setquitting; 883 setquitting = setquitting;
759 wrapclient = wrapclient; 884 wrapclient = wrapclient;
760 watchfd = watchfd; 885 watchfd = watchfd;
764 end; 889 end;
765 890
766 -- libevent emulation 891 -- libevent emulation
767 event = { EV_READ = "r", EV_WRITE = "w", EV_READWRITE = "rw", EV_LEAVE = -1 }; 892 event = { EV_READ = "r", EV_WRITE = "w", EV_READWRITE = "rw", EV_LEAVE = -1 };
768 addevent = function (fd, mode, callback) 893 addevent = function (fd, mode, callback)
894 log("warn", "Using deprecated libevent emulation, please update code to use watchfd API instead");
769 local function onevent(self) 895 local function onevent(self)
770 local ret = self:callback(); 896 local ret = self:callback();
771 if ret == -1 then 897 if ret == -1 then
772 self:set(false, false); 898 self:set(false, false);
773 elseif ret then 899 elseif ret then
783 close = function (self) 909 close = function (self)
784 self:del(); 910 self:del();
785 fds[fd] = nil; 911 fds[fd] = nil;
786 end; 912 end;
787 }, interface_mt); 913 }, interface_mt);
914 conn.id = conn:getfd();
915 conn.log = logger.init(("fdwatch%d"):format(conn.id));
788 local ok, err = conn:add(mode == "r" or mode == "rw", mode == "w" or mode == "rw"); 916 local ok, err = conn:add(mode == "r" or mode == "rw", mode == "w" or mode == "rw");
789 if not ok then return ok, err; end 917 if not ok then return ok, err; end
790 return conn; 918 return conn;
791 end; 919 end;
792 }; 920 };