Software / code / prosody
Comparison
net/server_epoll.lua @ 10089:90e459f48cbd
net.server_epoll: Overhaul logging with one log sink per connection
| author | Kim Alvefur <zash@zash.se> |
|---|---|
| date | Fri, 26 Jul 2019 21:21:48 +0200 |
| parent | 10088:97da8452c6a6 |
| child | 10091:bd547587f48c |
comparison
equal
deleted
inserted
replaced
| 10088:97da8452c6a6 | 10089:90e459f48cbd |
|---|---|
| 12 local tostring = tostring; | 12 local tostring = tostring; |
| 13 local pcall = pcall; | 13 local pcall = pcall; |
| 14 local type = type; | 14 local type = type; |
| 15 local next = next; | 15 local next = next; |
| 16 local pairs = pairs; | 16 local pairs = pairs; |
| 17 local log = require "util.logger".init("server_epoll"); | 17 local logger = require "util.logger"; |
| 18 local log = logger.init("server_epoll"); | |
| 18 local socket = require "socket"; | 19 local socket = require "socket"; |
| 19 local luasec = require "ssl"; | 20 local luasec = require "ssl"; |
| 20 local gettime = require "util.time".now; | 21 local gettime = require "util.time".now; |
| 21 local indexedbheap = require "util.indexedbheap"; | 22 local indexedbheap = require "util.indexedbheap"; |
| 22 local createtable = require "util.table".create; | 23 local createtable = require "util.table".create; |
| 23 local inet = require "util.net"; | 24 local inet = require "util.net"; |
| 24 local inet_pton = inet.pton; | 25 local inet_pton = inet.pton; |
| 25 local _SOCKETINVALID = socket._SOCKETINVALID or -1; | 26 local _SOCKETINVALID = socket._SOCKETINVALID or -1; |
| 27 local new_id = require "util.id".medium; | |
| 26 | 28 |
| 27 local poller = require "util.poll" | 29 local poller = require "util.poll" |
| 28 local EEXIST = poller.EEXIST; | 30 local EEXIST = poller.EEXIST; |
| 29 local ENOENT = poller.ENOENT; | 31 local ENOENT = poller.ENOENT; |
| 30 | 32 |
| 143 return ("FD %d (%s, %d)"):format(self:getfd(), self.sockname or self.peername, self.sockport or self.peerport); | 145 return ("FD %d (%s, %d)"):format(self:getfd(), self.sockname or self.peername, self.sockport or self.peerport); |
| 144 end | 146 end |
| 145 return ("FD %d"):format(self:getfd()); | 147 return ("FD %d"):format(self:getfd()); |
| 146 end | 148 end |
| 147 | 149 |
| 150 interface.log = log; | |
| 151 function interface:debug(msg, ...) --luacheck: ignore 212/self | |
| 152 self.log("debug", msg, ...); | |
| 153 end | |
| 154 | |
| 155 function interface:error(msg, ...) --luacheck: ignore 212/self | |
| 156 self.log("error", msg, ...); | |
| 157 end | |
| 158 | |
| 148 -- Replace the listener and tell the old one | 159 -- Replace the listener and tell the old one |
| 149 function interface:setlistener(listeners, data) | 160 function interface:setlistener(listeners, data) |
| 150 self:on("detach"); | 161 self:on("detach"); |
| 151 self.listeners = listeners; | 162 self.listeners = listeners; |
| 152 self:on("attach", data); | 163 self:on("attach", data); |
| 153 end | 164 end |
| 154 | 165 |
| 155 -- Call a listener callback | 166 -- Call a listener callback |
| 156 function interface:on(what, ...) | 167 function interface:on(what, ...) |
| 157 if not self.listeners then | 168 if not self.listeners then |
| 158 log("error", "%s has no listeners", self); | 169 self:debug("Interface is missing listener callbacks"); |
| 159 return; | 170 return; |
| 160 end | 171 end |
| 161 local listener = self.listeners["on"..what]; | 172 local listener = self.listeners["on"..what]; |
| 162 if not listener then | 173 if not listener then |
| 163 -- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging | 174 -- self:debug("Missing listener 'on%s'", what); -- uncomment for development and debugging |
| 164 return; | 175 return; |
| 165 end | 176 end |
| 166 local ok, err = pcall(listener, self, ...); | 177 local ok, err = pcall(listener, self, ...); |
| 167 if not ok then | 178 if not ok then |
| 168 log("error", "Error calling on%s: %s", what, err); | |
| 169 if cfg.fatal_errors then | 179 if cfg.fatal_errors then |
| 170 log("debug", "Closing %s due to error in listener", self); | 180 self:debug("Closing due to error calling on%s: %s", what, err); |
| 171 self:destroy(); | 181 self:destroy(); |
| 182 else | |
| 183 self:debug("Error calling on%s: %s", what, err); | |
| 172 end | 184 end |
| 173 return nil, err; | 185 return nil, err; |
| 174 end | 186 end |
| 175 return err; | 187 return err; |
| 176 end | 188 end |
| 279 if r == nil then r = self._wantread; end | 291 if r == nil then r = self._wantread; end |
| 280 if w == nil then w = self._wantwrite; end | 292 if w == nil then w = self._wantwrite; end |
| 281 local ok, err, errno = poll:add(fd, r, w); | 293 local ok, err, errno = poll:add(fd, r, w); |
| 282 if not ok then | 294 if not ok then |
| 283 if errno == EEXIST then | 295 if errno == EEXIST then |
| 284 log("debug", "%s already registered!", self); | 296 self:debug("FD already registered in poller! (EEXIST)"); |
| 285 return self:set(r, w); -- So try to change its flags | 297 return self:set(r, w); -- So try to change its flags |
| 286 end | 298 end |
| 287 log("error", "Could not register %s: %s(%d)", self, err, errno); | 299 self:debug("Could not register in poller: %s(%d)", err, errno); |
| 288 return ok, err; | 300 return ok, err; |
| 289 end | 301 end |
| 290 self._wantread, self._wantwrite = r, w; | 302 self._wantread, self._wantwrite = r, w; |
| 291 fds[fd] = self; | 303 fds[fd] = self; |
| 292 log("debug", "Watching %s", self); | 304 self:debug("Registered in poller"); |
| 293 return true; | 305 return true; |
| 294 end | 306 end |
| 295 | 307 |
| 296 function interface:set(r, w) | 308 function interface:set(r, w) |
| 297 local fd = self:getfd(); | 309 local fd = self:getfd(); |
| 300 end | 312 end |
| 301 if r == nil then r = self._wantread; end | 313 if r == nil then r = self._wantread; end |
| 302 if w == nil then w = self._wantwrite; end | 314 if w == nil then w = self._wantwrite; end |
| 303 local ok, err, errno = poll:set(fd, r, w); | 315 local ok, err, errno = poll:set(fd, r, w); |
| 304 if not ok then | 316 if not ok then |
| 305 log("error", "Could not update poller state %s: %s(%d)", self, err, errno); | 317 self:debug("Could not update poller state: %s(%d)", err, errno); |
| 306 return ok, err; | 318 return ok, err; |
| 307 end | 319 end |
| 308 self._wantread, self._wantwrite = r, w; | 320 self._wantread, self._wantwrite = r, w; |
| 309 return true; | 321 return true; |
| 310 end | 322 end |
| 317 if fds[fd] ~= self then | 329 if fds[fd] ~= self then |
| 318 return nil, "unregistered fd"; | 330 return nil, "unregistered fd"; |
| 319 end | 331 end |
| 320 local ok, err, errno = poll:del(fd); | 332 local ok, err, errno = poll:del(fd); |
| 321 if not ok and errno ~= ENOENT then | 333 if not ok and errno ~= ENOENT then |
| 322 log("error", "Could not unregister %s: %s(%d)", self, err, errno); | 334 self:debug("Could not unregister: %s(%d)", err, errno); |
| 323 return ok, err; | 335 return ok, err; |
| 324 end | 336 end |
| 325 self._wantread, self._wantwrite = nil, nil; | 337 self._wantread, self._wantwrite = nil, nil; |
| 326 fds[fd] = nil; | 338 fds[fd] = nil; |
| 327 log("debug", "Unwatched %s", self); | 339 self:debug("Unregistered from poller"); |
| 328 return true; | 340 return true; |
| 329 end | 341 end |
| 330 | 342 |
| 331 function interface:setflags(r, w) | 343 function interface:setflags(r, w) |
| 332 if not(self._wantread or self._wantwrite) then | 344 if not(self._wantread or self._wantwrite) then |
| 430 -- Close, possibly after writing is done | 442 -- Close, possibly after writing is done |
| 431 function interface:close() | 443 function interface:close() |
| 432 if self.writebuffer and self.writebuffer[1] then | 444 if self.writebuffer and self.writebuffer[1] then |
| 433 self:set(false, true); -- Flush final buffer contents | 445 self:set(false, true); -- Flush final buffer contents |
| 434 self.write, self.send = noop, noop; -- No more writing | 446 self.write, self.send = noop, noop; -- No more writing |
| 435 log("debug", "Close %s after writing", self); | 447 self:debug("Close after writing"); |
| 436 self.ondrain = interface.close; | 448 self.ondrain = interface.close; |
| 437 else | 449 else |
| 438 log("debug", "Close %s now", self); | 450 self:debug("Closing now"); |
| 439 self.write, self.send = noop, noop; | 451 self.write, self.send = noop, noop; |
| 440 self.close = noop; | 452 self.close = noop; |
| 441 self:on("disconnect"); | 453 self:on("disconnect"); |
| 442 self:destroy(); | 454 self:destroy(); |
| 443 end | 455 end |
| 462 | 474 |
| 463 function interface:starttls(tls_ctx) | 475 function interface:starttls(tls_ctx) |
| 464 if tls_ctx then self.tls_ctx = tls_ctx; end | 476 if tls_ctx then self.tls_ctx = tls_ctx; end |
| 465 self.starttls = false; | 477 self.starttls = false; |
| 466 if self.writebuffer and self.writebuffer[1] then | 478 if self.writebuffer and self.writebuffer[1] then |
| 467 log("debug", "Start TLS on %s after write", self); | 479 self:debug("Start TLS after write"); |
| 468 self.ondrain = interface.starttls; | 480 self.ondrain = interface.starttls; |
| 469 self:set(nil, true); -- make sure wantwrite is set | 481 self:set(nil, true); -- make sure wantwrite is set |
| 470 else | 482 else |
| 471 if self.ondrain == interface.starttls then | 483 if self.ondrain == interface.starttls then |
| 472 self.ondrain = nil; | 484 self.ondrain = nil; |
| 473 end | 485 end |
| 474 self.onwritable = interface.tlshandskake; | 486 self.onwritable = interface.tlshandskake; |
| 475 self.onreadable = interface.tlshandskake; | 487 self.onreadable = interface.tlshandskake; |
| 476 self:set(true, true); | 488 self:set(true, true); |
| 477 log("debug", "Prepare to start TLS on %s", self); | 489 self:debug("Prepared to start TLS"); |
| 478 end | 490 end |
| 479 end | 491 end |
| 480 | 492 |
| 481 function interface:tlshandskake() | 493 function interface:tlshandskake() |
| 482 self:setwritetimeout(false); | 494 self:setwritetimeout(false); |
| 483 self:setreadtimeout(false); | 495 self:setreadtimeout(false); |
| 484 if not self._tls then | 496 if not self._tls then |
| 485 self._tls = true; | 497 self._tls = true; |
| 486 log("debug", "Start TLS on %s now", self); | 498 self:debug("Starting TLS now"); |
| 487 self:del(); | 499 self:del(); |
| 488 local ok, conn, err = pcall(luasec.wrap, self.conn, self.tls_ctx); | 500 local ok, conn, err = pcall(luasec.wrap, self.conn, self.tls_ctx); |
| 489 if not ok then | 501 if not ok then |
| 490 conn, err = ok, conn; | 502 conn, err = ok, conn; |
| 491 log("error", "Failed to initialize TLS: %s", err); | 503 self:debug("Failed to initialize TLS: %s", err); |
| 492 end | 504 end |
| 493 if not conn then | 505 if not conn then |
| 494 self:on("disconnect", err); | 506 self:on("disconnect", err); |
| 495 self:destroy(); | 507 self:destroy(); |
| 496 return conn, err; | 508 return conn, err; |
| 510 self.onreadable = interface.tlshandskake; | 522 self.onreadable = interface.tlshandskake; |
| 511 return self:init(); | 523 return self:init(); |
| 512 end | 524 end |
| 513 local ok, err = self.conn:dohandshake(); | 525 local ok, err = self.conn:dohandshake(); |
| 514 if ok then | 526 if ok then |
| 515 log("debug", "TLS handshake on %s complete", self); | 527 self:debug("TLS handshake complete"); |
| 516 self.onwritable = nil; | 528 self.onwritable = nil; |
| 517 self.onreadable = nil; | 529 self.onreadable = nil; |
| 518 self:on("status", "ssl-handshake-complete"); | 530 self:on("status", "ssl-handshake-complete"); |
| 519 self:setwritetimeout(); | 531 self:setwritetimeout(); |
| 520 self:set(true, true); | 532 self:set(true, true); |
| 521 elseif err == "wantread" then | 533 elseif err == "wantread" then |
| 522 log("debug", "TLS handshake on %s to wait until readable", self); | 534 self:debug("TLS handshake to wait until readable"); |
| 523 self:set(true, false); | 535 self:set(true, false); |
| 524 self:setreadtimeout(cfg.ssl_handshake_timeout); | 536 self:setreadtimeout(cfg.ssl_handshake_timeout); |
| 525 elseif err == "wantwrite" then | 537 elseif err == "wantwrite" then |
| 526 log("debug", "TLS handshake on %s to wait until writable", self); | 538 self:debug("TLS handshake to wait until writable"); |
| 527 self:set(false, true); | 539 self:set(false, true); |
| 528 self:setwritetimeout(cfg.ssl_handshake_timeout); | 540 self:setwritetimeout(cfg.ssl_handshake_timeout); |
| 529 else | 541 else |
| 530 log("debug", "TLS handshake error on %s: %s", self, err); | 542 self:debug("TLS handshake error: %s", err); |
| 531 self:on("disconnect", err); | 543 self:on("disconnect", err); |
| 532 self:destroy(); | 544 self:destroy(); |
| 533 end | 545 end |
| 534 end | 546 end |
| 535 | 547 |
| 542 listeners = listeners; | 554 listeners = listeners; |
| 543 read_size = read_size or (server and server.read_size); | 555 read_size = read_size or (server and server.read_size); |
| 544 writebuffer = {}; | 556 writebuffer = {}; |
| 545 tls_ctx = tls_ctx or (server and server.tls_ctx); | 557 tls_ctx = tls_ctx or (server and server.tls_ctx); |
| 546 tls_direct = server and server.tls_direct; | 558 tls_direct = server and server.tls_direct; |
| 559 log = logger.init(("conn%s"):format(new_id())); | |
| 547 }, interface_mt); | 560 }, interface_mt); |
| 548 | 561 |
| 549 conn:updatenames(); | 562 conn:updatenames(); |
| 550 return conn; | 563 return conn; |
| 551 end | 564 end |
| 565 -- A server interface has new incoming connections waiting | 578 -- A server interface has new incoming connections waiting |
| 566 -- This replaces the onreadable callback | 579 -- This replaces the onreadable callback |
| 567 function interface:onacceptable() | 580 function interface:onacceptable() |
| 568 local conn, err = self.conn:accept(); | 581 local conn, err = self.conn:accept(); |
| 569 if not conn then | 582 if not conn then |
| 570 log("debug", "Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval); | 583 self:debug("Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval); |
| 571 self:pausefor(cfg.accept_retry_interval); | 584 self:pausefor(cfg.accept_retry_interval); |
| 572 return; | 585 return; |
| 573 end | 586 end |
| 574 local client = wrapsocket(conn, self, nil, self.listeners); | 587 local client = wrapsocket(conn, self, nil, self.listeners); |
| 575 log("debug", "New connection %s", tostring(client)); | 588 client:debug("New connection %s on server %s", client, self); |
| 576 client:init(); | 589 client:init(); |
| 577 if self.tls_direct then | 590 if self.tls_direct then |
| 578 client:starttls(self.tls_ctx); | 591 client:starttls(self.tls_ctx); |
| 579 else | 592 else |
| 580 client:onconnect(); | 593 client:onconnect(); |
| 645 tls_ctx = config and config.tls_ctx; | 658 tls_ctx = config and config.tls_ctx; |
| 646 tls_direct = config and config.tls_direct; | 659 tls_direct = config and config.tls_direct; |
| 647 hosts = config and config.sni_hosts; | 660 hosts = config and config.sni_hosts; |
| 648 sockname = addr; | 661 sockname = addr; |
| 649 sockport = port; | 662 sockport = port; |
| 663 log = logger.init(("serv%s"):format(new_id())); | |
| 650 }, interface_mt); | 664 }, interface_mt); |
| 665 server:debug("Server %s created", server); | |
| 651 server:add(true, false); | 666 server:add(true, false); |
| 652 return server; | 667 return server; |
| 653 end | 668 end |
| 654 | 669 |
| 655 -- COMPAT | 670 -- COMPAT |
| 703 local ok, err = client:init(); | 718 local ok, err = client:init(); |
| 704 if not ok then return ok, err; end | 719 if not ok then return ok, err; end |
| 705 if tls_ctx then | 720 if tls_ctx then |
| 706 client:starttls(tls_ctx); | 721 client:starttls(tls_ctx); |
| 707 end | 722 end |
| 723 client:debug("Client %s created", client); | |
| 708 return client, conn; | 724 return client, conn; |
| 709 end | 725 end |
| 710 | 726 |
| 711 local function watchfd(fd, onreadable, onwritable) | 727 local function watchfd(fd, onreadable, onwritable) |
| 712 local conn = setmetatable({ | 728 local conn = setmetatable({ |
| 721 conn.getfd = function () | 737 conn.getfd = function () |
| 722 return fd; | 738 return fd; |
| 723 end; | 739 end; |
| 724 -- Otherwise it'll need to be something LuaSocket-compatible | 740 -- Otherwise it'll need to be something LuaSocket-compatible |
| 725 end | 741 end |
| 742 conn.log = logger.init(("fdwatch%s"):format(new_id())); | |
| 726 conn:add(onreadable, onwritable); | 743 conn:add(onreadable, onwritable); |
| 727 return conn; | 744 return conn; |
| 728 end; | 745 end; |
| 729 | 746 |
| 730 -- Dump all data from one connection into another | 747 -- Dump all data from one connection into another |
| 831 close = function (self) | 848 close = function (self) |
| 832 self:del(); | 849 self:del(); |
| 833 fds[fd] = nil; | 850 fds[fd] = nil; |
| 834 end; | 851 end; |
| 835 }, interface_mt); | 852 }, interface_mt); |
| 853 conn.log = logger.init(("fdwatch%d"):format(conn:getfd())); | |
| 836 local ok, err = conn:add(mode == "r" or mode == "rw", mode == "w" or mode == "rw"); | 854 local ok, err = conn:add(mode == "r" or mode == "rw", mode == "w" or mode == "rw"); |
| 837 if not ok then return ok, err; end | 855 if not ok then return ok, err; end |
| 838 return conn; | 856 return conn; |
| 839 end; | 857 end; |
| 840 }; | 858 }; |