Comparison

net/server_epoll.lua @ 10089:90e459f48cbd

net.server_epoll: Overhaul logging with one log sink per connection
author Kim Alvefur <zash@zash.se>
date Fri, 26 Jul 2019 21:21:48 +0200
parent 10088:97da8452c6a6
child 10091:bd547587f48c
comparison
equal deleted inserted replaced
10088:97da8452c6a6 10089:90e459f48cbd
12 local tostring = tostring; 12 local tostring = tostring;
13 local pcall = pcall; 13 local pcall = pcall;
14 local type = type; 14 local type = type;
15 local next = next; 15 local next = next;
16 local pairs = pairs; 16 local pairs = pairs;
17 local log = require "util.logger".init("server_epoll"); 17 local logger = require "util.logger";
18 local log = logger.init("server_epoll");
18 local socket = require "socket"; 19 local socket = require "socket";
19 local luasec = require "ssl"; 20 local luasec = require "ssl";
20 local gettime = require "util.time".now; 21 local gettime = require "util.time".now;
21 local indexedbheap = require "util.indexedbheap"; 22 local indexedbheap = require "util.indexedbheap";
22 local createtable = require "util.table".create; 23 local createtable = require "util.table".create;
23 local inet = require "util.net"; 24 local inet = require "util.net";
24 local inet_pton = inet.pton; 25 local inet_pton = inet.pton;
25 local _SOCKETINVALID = socket._SOCKETINVALID or -1; 26 local _SOCKETINVALID = socket._SOCKETINVALID or -1;
27 local new_id = require "util.id".medium;
26 28
27 local poller = require "util.poll" 29 local poller = require "util.poll"
28 local EEXIST = poller.EEXIST; 30 local EEXIST = poller.EEXIST;
29 local ENOENT = poller.ENOENT; 31 local ENOENT = poller.ENOENT;
30 32
143 return ("FD %d (%s, %d)"):format(self:getfd(), self.sockname or self.peername, self.sockport or self.peerport); 145 return ("FD %d (%s, %d)"):format(self:getfd(), self.sockname or self.peername, self.sockport or self.peerport);
144 end 146 end
145 return ("FD %d"):format(self:getfd()); 147 return ("FD %d"):format(self:getfd());
146 end 148 end
147 149
150 interface.log = log;
151 function interface:debug(msg, ...) --luacheck: ignore 212/self
152 self.log("debug", msg, ...);
153 end
154
155 function interface:error(msg, ...) --luacheck: ignore 212/self
156 self.log("error", msg, ...);
157 end
158
148 -- Replace the listener and tell the old one 159 -- Replace the listener and tell the old one
149 function interface:setlistener(listeners, data) 160 function interface:setlistener(listeners, data)
150 self:on("detach"); 161 self:on("detach");
151 self.listeners = listeners; 162 self.listeners = listeners;
152 self:on("attach", data); 163 self:on("attach", data);
153 end 164 end
154 165
155 -- Call a listener callback 166 -- Call a listener callback
156 function interface:on(what, ...) 167 function interface:on(what, ...)
157 if not self.listeners then 168 if not self.listeners then
158 log("error", "%s has no listeners", self); 169 self:debug("Interface is missing listener callbacks");
159 return; 170 return;
160 end 171 end
161 local listener = self.listeners["on"..what]; 172 local listener = self.listeners["on"..what];
162 if not listener then 173 if not listener then
163 -- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging 174 -- self:debug("Missing listener 'on%s'", what); -- uncomment for development and debugging
164 return; 175 return;
165 end 176 end
166 local ok, err = pcall(listener, self, ...); 177 local ok, err = pcall(listener, self, ...);
167 if not ok then 178 if not ok then
168 log("error", "Error calling on%s: %s", what, err);
169 if cfg.fatal_errors then 179 if cfg.fatal_errors then
170 log("debug", "Closing %s due to error in listener", self); 180 self:debug("Closing due to error calling on%s: %s", what, err);
171 self:destroy(); 181 self:destroy();
182 else
183 self:debug("Error calling on%s: %s", what, err);
172 end 184 end
173 return nil, err; 185 return nil, err;
174 end 186 end
175 return err; 187 return err;
176 end 188 end
279 if r == nil then r = self._wantread; end 291 if r == nil then r = self._wantread; end
280 if w == nil then w = self._wantwrite; end 292 if w == nil then w = self._wantwrite; end
281 local ok, err, errno = poll:add(fd, r, w); 293 local ok, err, errno = poll:add(fd, r, w);
282 if not ok then 294 if not ok then
283 if errno == EEXIST then 295 if errno == EEXIST then
284 log("debug", "%s already registered!", self); 296 self:debug("FD already registered in poller! (EEXIST)");
285 return self:set(r, w); -- So try to change its flags 297 return self:set(r, w); -- So try to change its flags
286 end 298 end
287 log("error", "Could not register %s: %s(%d)", self, err, errno); 299 self:debug("Could not register in poller: %s(%d)", err, errno);
288 return ok, err; 300 return ok, err;
289 end 301 end
290 self._wantread, self._wantwrite = r, w; 302 self._wantread, self._wantwrite = r, w;
291 fds[fd] = self; 303 fds[fd] = self;
292 log("debug", "Watching %s", self); 304 self:debug("Registered in poller");
293 return true; 305 return true;
294 end 306 end
295 307
296 function interface:set(r, w) 308 function interface:set(r, w)
297 local fd = self:getfd(); 309 local fd = self:getfd();
300 end 312 end
301 if r == nil then r = self._wantread; end 313 if r == nil then r = self._wantread; end
302 if w == nil then w = self._wantwrite; end 314 if w == nil then w = self._wantwrite; end
303 local ok, err, errno = poll:set(fd, r, w); 315 local ok, err, errno = poll:set(fd, r, w);
304 if not ok then 316 if not ok then
305 log("error", "Could not update poller state %s: %s(%d)", self, err, errno); 317 self:debug("Could not update poller state: %s(%d)", err, errno);
306 return ok, err; 318 return ok, err;
307 end 319 end
308 self._wantread, self._wantwrite = r, w; 320 self._wantread, self._wantwrite = r, w;
309 return true; 321 return true;
310 end 322 end
317 if fds[fd] ~= self then 329 if fds[fd] ~= self then
318 return nil, "unregistered fd"; 330 return nil, "unregistered fd";
319 end 331 end
320 local ok, err, errno = poll:del(fd); 332 local ok, err, errno = poll:del(fd);
321 if not ok and errno ~= ENOENT then 333 if not ok and errno ~= ENOENT then
322 log("error", "Could not unregister %s: %s(%d)", self, err, errno); 334 self:debug("Could not unregister: %s(%d)", err, errno);
323 return ok, err; 335 return ok, err;
324 end 336 end
325 self._wantread, self._wantwrite = nil, nil; 337 self._wantread, self._wantwrite = nil, nil;
326 fds[fd] = nil; 338 fds[fd] = nil;
327 log("debug", "Unwatched %s", self); 339 self:debug("Unregistered from poller");
328 return true; 340 return true;
329 end 341 end
330 342
331 function interface:setflags(r, w) 343 function interface:setflags(r, w)
332 if not(self._wantread or self._wantwrite) then 344 if not(self._wantread or self._wantwrite) then
430 -- Close, possibly after writing is done 442 -- Close, possibly after writing is done
431 function interface:close() 443 function interface:close()
432 if self.writebuffer and self.writebuffer[1] then 444 if self.writebuffer and self.writebuffer[1] then
433 self:set(false, true); -- Flush final buffer contents 445 self:set(false, true); -- Flush final buffer contents
434 self.write, self.send = noop, noop; -- No more writing 446 self.write, self.send = noop, noop; -- No more writing
435 log("debug", "Close %s after writing", self); 447 self:debug("Close after writing");
436 self.ondrain = interface.close; 448 self.ondrain = interface.close;
437 else 449 else
438 log("debug", "Close %s now", self); 450 self:debug("Closing now");
439 self.write, self.send = noop, noop; 451 self.write, self.send = noop, noop;
440 self.close = noop; 452 self.close = noop;
441 self:on("disconnect"); 453 self:on("disconnect");
442 self:destroy(); 454 self:destroy();
443 end 455 end
462 474
463 function interface:starttls(tls_ctx) 475 function interface:starttls(tls_ctx)
464 if tls_ctx then self.tls_ctx = tls_ctx; end 476 if tls_ctx then self.tls_ctx = tls_ctx; end
465 self.starttls = false; 477 self.starttls = false;
466 if self.writebuffer and self.writebuffer[1] then 478 if self.writebuffer and self.writebuffer[1] then
467 log("debug", "Start TLS on %s after write", self); 479 self:debug("Start TLS after write");
468 self.ondrain = interface.starttls; 480 self.ondrain = interface.starttls;
469 self:set(nil, true); -- make sure wantwrite is set 481 self:set(nil, true); -- make sure wantwrite is set
470 else 482 else
471 if self.ondrain == interface.starttls then 483 if self.ondrain == interface.starttls then
472 self.ondrain = nil; 484 self.ondrain = nil;
473 end 485 end
474 self.onwritable = interface.tlshandskake; 486 self.onwritable = interface.tlshandskake;
475 self.onreadable = interface.tlshandskake; 487 self.onreadable = interface.tlshandskake;
476 self:set(true, true); 488 self:set(true, true);
477 log("debug", "Prepare to start TLS on %s", self); 489 self:debug("Prepared to start TLS");
478 end 490 end
479 end 491 end
480 492
481 function interface:tlshandskake() 493 function interface:tlshandskake()
482 self:setwritetimeout(false); 494 self:setwritetimeout(false);
483 self:setreadtimeout(false); 495 self:setreadtimeout(false);
484 if not self._tls then 496 if not self._tls then
485 self._tls = true; 497 self._tls = true;
486 log("debug", "Start TLS on %s now", self); 498 self:debug("Starting TLS now");
487 self:del(); 499 self:del();
488 local ok, conn, err = pcall(luasec.wrap, self.conn, self.tls_ctx); 500 local ok, conn, err = pcall(luasec.wrap, self.conn, self.tls_ctx);
489 if not ok then 501 if not ok then
490 conn, err = ok, conn; 502 conn, err = ok, conn;
491 log("error", "Failed to initialize TLS: %s", err); 503 self:debug("Failed to initialize TLS: %s", err);
492 end 504 end
493 if not conn then 505 if not conn then
494 self:on("disconnect", err); 506 self:on("disconnect", err);
495 self:destroy(); 507 self:destroy();
496 return conn, err; 508 return conn, err;
510 self.onreadable = interface.tlshandskake; 522 self.onreadable = interface.tlshandskake;
511 return self:init(); 523 return self:init();
512 end 524 end
513 local ok, err = self.conn:dohandshake(); 525 local ok, err = self.conn:dohandshake();
514 if ok then 526 if ok then
515 log("debug", "TLS handshake on %s complete", self); 527 self:debug("TLS handshake complete");
516 self.onwritable = nil; 528 self.onwritable = nil;
517 self.onreadable = nil; 529 self.onreadable = nil;
518 self:on("status", "ssl-handshake-complete"); 530 self:on("status", "ssl-handshake-complete");
519 self:setwritetimeout(); 531 self:setwritetimeout();
520 self:set(true, true); 532 self:set(true, true);
521 elseif err == "wantread" then 533 elseif err == "wantread" then
522 log("debug", "TLS handshake on %s to wait until readable", self); 534 self:debug("TLS handshake to wait until readable");
523 self:set(true, false); 535 self:set(true, false);
524 self:setreadtimeout(cfg.ssl_handshake_timeout); 536 self:setreadtimeout(cfg.ssl_handshake_timeout);
525 elseif err == "wantwrite" then 537 elseif err == "wantwrite" then
526 log("debug", "TLS handshake on %s to wait until writable", self); 538 self:debug("TLS handshake to wait until writable");
527 self:set(false, true); 539 self:set(false, true);
528 self:setwritetimeout(cfg.ssl_handshake_timeout); 540 self:setwritetimeout(cfg.ssl_handshake_timeout);
529 else 541 else
530 log("debug", "TLS handshake error on %s: %s", self, err); 542 self:debug("TLS handshake error: %s", err);
531 self:on("disconnect", err); 543 self:on("disconnect", err);
532 self:destroy(); 544 self:destroy();
533 end 545 end
534 end 546 end
535 547
542 listeners = listeners; 554 listeners = listeners;
543 read_size = read_size or (server and server.read_size); 555 read_size = read_size or (server and server.read_size);
544 writebuffer = {}; 556 writebuffer = {};
545 tls_ctx = tls_ctx or (server and server.tls_ctx); 557 tls_ctx = tls_ctx or (server and server.tls_ctx);
546 tls_direct = server and server.tls_direct; 558 tls_direct = server and server.tls_direct;
559 log = logger.init(("conn%s"):format(new_id()));
547 }, interface_mt); 560 }, interface_mt);
548 561
549 conn:updatenames(); 562 conn:updatenames();
550 return conn; 563 return conn;
551 end 564 end
565 -- A server interface has new incoming connections waiting 578 -- A server interface has new incoming connections waiting
566 -- This replaces the onreadable callback 579 -- This replaces the onreadable callback
567 function interface:onacceptable() 580 function interface:onacceptable()
568 local conn, err = self.conn:accept(); 581 local conn, err = self.conn:accept();
569 if not conn then 582 if not conn then
570 log("debug", "Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval); 583 self:debug("Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval);
571 self:pausefor(cfg.accept_retry_interval); 584 self:pausefor(cfg.accept_retry_interval);
572 return; 585 return;
573 end 586 end
574 local client = wrapsocket(conn, self, nil, self.listeners); 587 local client = wrapsocket(conn, self, nil, self.listeners);
575 log("debug", "New connection %s", tostring(client)); 588 client:debug("New connection %s on server %s", client, self);
576 client:init(); 589 client:init();
577 if self.tls_direct then 590 if self.tls_direct then
578 client:starttls(self.tls_ctx); 591 client:starttls(self.tls_ctx);
579 else 592 else
580 client:onconnect(); 593 client:onconnect();
645 tls_ctx = config and config.tls_ctx; 658 tls_ctx = config and config.tls_ctx;
646 tls_direct = config and config.tls_direct; 659 tls_direct = config and config.tls_direct;
647 hosts = config and config.sni_hosts; 660 hosts = config and config.sni_hosts;
648 sockname = addr; 661 sockname = addr;
649 sockport = port; 662 sockport = port;
663 log = logger.init(("serv%s"):format(new_id()));
650 }, interface_mt); 664 }, interface_mt);
665 server:debug("Server %s created", server);
651 server:add(true, false); 666 server:add(true, false);
652 return server; 667 return server;
653 end 668 end
654 669
655 -- COMPAT 670 -- COMPAT
703 local ok, err = client:init(); 718 local ok, err = client:init();
704 if not ok then return ok, err; end 719 if not ok then return ok, err; end
705 if tls_ctx then 720 if tls_ctx then
706 client:starttls(tls_ctx); 721 client:starttls(tls_ctx);
707 end 722 end
723 client:debug("Client %s created", client);
708 return client, conn; 724 return client, conn;
709 end 725 end
710 726
711 local function watchfd(fd, onreadable, onwritable) 727 local function watchfd(fd, onreadable, onwritable)
712 local conn = setmetatable({ 728 local conn = setmetatable({
721 conn.getfd = function () 737 conn.getfd = function ()
722 return fd; 738 return fd;
723 end; 739 end;
724 -- Otherwise it'll need to be something LuaSocket-compatible 740 -- Otherwise it'll need to be something LuaSocket-compatible
725 end 741 end
742 conn.log = logger.init(("fdwatch%s"):format(new_id()));
726 conn:add(onreadable, onwritable); 743 conn:add(onreadable, onwritable);
727 return conn; 744 return conn;
728 end; 745 end;
729 746
730 -- Dump all data from one connection into another 747 -- Dump all data from one connection into another
831 close = function (self) 848 close = function (self)
832 self:del(); 849 self:del();
833 fds[fd] = nil; 850 fds[fd] = nil;
834 end; 851 end;
835 }, interface_mt); 852 }, interface_mt);
853 conn.log = logger.init(("fdwatch%d"):format(conn:getfd()));
836 local ok, err = conn:add(mode == "r" or mode == "rw", mode == "w" or mode == "rw"); 854 local ok, err = conn:add(mode == "r" or mode == "rw", mode == "w" or mode == "rw");
837 if not ok then return ok, err; end 855 if not ok then return ok, err; end
838 return conn; 856 return conn;
839 end; 857 end;
840 }; 858 };