Software /
code /
prosody
Comparison
net/server_epoll.lua @ 10089:90e459f48cbd
net.server_epoll: Overhaul logging with one log sink per connection
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Fri, 26 Jul 2019 21:21:48 +0200 |
parent | 10088:97da8452c6a6 |
child | 10091:bd547587f48c |
comparison
equal
deleted
inserted
replaced
10088:97da8452c6a6 | 10089:90e459f48cbd |
---|---|
12 local tostring = tostring; | 12 local tostring = tostring; |
13 local pcall = pcall; | 13 local pcall = pcall; |
14 local type = type; | 14 local type = type; |
15 local next = next; | 15 local next = next; |
16 local pairs = pairs; | 16 local pairs = pairs; |
17 local log = require "util.logger".init("server_epoll"); | 17 local logger = require "util.logger"; |
18 local log = logger.init("server_epoll"); | |
18 local socket = require "socket"; | 19 local socket = require "socket"; |
19 local luasec = require "ssl"; | 20 local luasec = require "ssl"; |
20 local gettime = require "util.time".now; | 21 local gettime = require "util.time".now; |
21 local indexedbheap = require "util.indexedbheap"; | 22 local indexedbheap = require "util.indexedbheap"; |
22 local createtable = require "util.table".create; | 23 local createtable = require "util.table".create; |
23 local inet = require "util.net"; | 24 local inet = require "util.net"; |
24 local inet_pton = inet.pton; | 25 local inet_pton = inet.pton; |
25 local _SOCKETINVALID = socket._SOCKETINVALID or -1; | 26 local _SOCKETINVALID = socket._SOCKETINVALID or -1; |
27 local new_id = require "util.id".medium; | |
26 | 28 |
27 local poller = require "util.poll" | 29 local poller = require "util.poll" |
28 local EEXIST = poller.EEXIST; | 30 local EEXIST = poller.EEXIST; |
29 local ENOENT = poller.ENOENT; | 31 local ENOENT = poller.ENOENT; |
30 | 32 |
143 return ("FD %d (%s, %d)"):format(self:getfd(), self.sockname or self.peername, self.sockport or self.peerport); | 145 return ("FD %d (%s, %d)"):format(self:getfd(), self.sockname or self.peername, self.sockport or self.peerport); |
144 end | 146 end |
145 return ("FD %d"):format(self:getfd()); | 147 return ("FD %d"):format(self:getfd()); |
146 end | 148 end |
147 | 149 |
150 interface.log = log; | |
151 function interface:debug(msg, ...) --luacheck: ignore 212/self | |
152 self.log("debug", msg, ...); | |
153 end | |
154 | |
155 function interface:error(msg, ...) --luacheck: ignore 212/self | |
156 self.log("error", msg, ...); | |
157 end | |
158 | |
148 -- Replace the listener and tell the old one | 159 -- Replace the listener and tell the old one |
149 function interface:setlistener(listeners, data) | 160 function interface:setlistener(listeners, data) |
150 self:on("detach"); | 161 self:on("detach"); |
151 self.listeners = listeners; | 162 self.listeners = listeners; |
152 self:on("attach", data); | 163 self:on("attach", data); |
153 end | 164 end |
154 | 165 |
155 -- Call a listener callback | 166 -- Call a listener callback |
156 function interface:on(what, ...) | 167 function interface:on(what, ...) |
157 if not self.listeners then | 168 if not self.listeners then |
158 log("error", "%s has no listeners", self); | 169 self:debug("Interface is missing listener callbacks"); |
159 return; | 170 return; |
160 end | 171 end |
161 local listener = self.listeners["on"..what]; | 172 local listener = self.listeners["on"..what]; |
162 if not listener then | 173 if not listener then |
163 -- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging | 174 -- self:debug("Missing listener 'on%s'", what); -- uncomment for development and debugging |
164 return; | 175 return; |
165 end | 176 end |
166 local ok, err = pcall(listener, self, ...); | 177 local ok, err = pcall(listener, self, ...); |
167 if not ok then | 178 if not ok then |
168 log("error", "Error calling on%s: %s", what, err); | |
169 if cfg.fatal_errors then | 179 if cfg.fatal_errors then |
170 log("debug", "Closing %s due to error in listener", self); | 180 self:debug("Closing due to error calling on%s: %s", what, err); |
171 self:destroy(); | 181 self:destroy(); |
182 else | |
183 self:debug("Error calling on%s: %s", what, err); | |
172 end | 184 end |
173 return nil, err; | 185 return nil, err; |
174 end | 186 end |
175 return err; | 187 return err; |
176 end | 188 end |
279 if r == nil then r = self._wantread; end | 291 if r == nil then r = self._wantread; end |
280 if w == nil then w = self._wantwrite; end | 292 if w == nil then w = self._wantwrite; end |
281 local ok, err, errno = poll:add(fd, r, w); | 293 local ok, err, errno = poll:add(fd, r, w); |
282 if not ok then | 294 if not ok then |
283 if errno == EEXIST then | 295 if errno == EEXIST then |
284 log("debug", "%s already registered!", self); | 296 self:debug("FD already registered in poller! (EEXIST)"); |
285 return self:set(r, w); -- So try to change its flags | 297 return self:set(r, w); -- So try to change its flags |
286 end | 298 end |
287 log("error", "Could not register %s: %s(%d)", self, err, errno); | 299 self:debug("Could not register in poller: %s(%d)", err, errno); |
288 return ok, err; | 300 return ok, err; |
289 end | 301 end |
290 self._wantread, self._wantwrite = r, w; | 302 self._wantread, self._wantwrite = r, w; |
291 fds[fd] = self; | 303 fds[fd] = self; |
292 log("debug", "Watching %s", self); | 304 self:debug("Registered in poller"); |
293 return true; | 305 return true; |
294 end | 306 end |
295 | 307 |
296 function interface:set(r, w) | 308 function interface:set(r, w) |
297 local fd = self:getfd(); | 309 local fd = self:getfd(); |
300 end | 312 end |
301 if r == nil then r = self._wantread; end | 313 if r == nil then r = self._wantread; end |
302 if w == nil then w = self._wantwrite; end | 314 if w == nil then w = self._wantwrite; end |
303 local ok, err, errno = poll:set(fd, r, w); | 315 local ok, err, errno = poll:set(fd, r, w); |
304 if not ok then | 316 if not ok then |
305 log("error", "Could not update poller state %s: %s(%d)", self, err, errno); | 317 self:debug("Could not update poller state: %s(%d)", err, errno); |
306 return ok, err; | 318 return ok, err; |
307 end | 319 end |
308 self._wantread, self._wantwrite = r, w; | 320 self._wantread, self._wantwrite = r, w; |
309 return true; | 321 return true; |
310 end | 322 end |
317 if fds[fd] ~= self then | 329 if fds[fd] ~= self then |
318 return nil, "unregistered fd"; | 330 return nil, "unregistered fd"; |
319 end | 331 end |
320 local ok, err, errno = poll:del(fd); | 332 local ok, err, errno = poll:del(fd); |
321 if not ok and errno ~= ENOENT then | 333 if not ok and errno ~= ENOENT then |
322 log("error", "Could not unregister %s: %s(%d)", self, err, errno); | 334 self:debug("Could not unregister: %s(%d)", err, errno); |
323 return ok, err; | 335 return ok, err; |
324 end | 336 end |
325 self._wantread, self._wantwrite = nil, nil; | 337 self._wantread, self._wantwrite = nil, nil; |
326 fds[fd] = nil; | 338 fds[fd] = nil; |
327 log("debug", "Unwatched %s", self); | 339 self:debug("Unregistered from poller"); |
328 return true; | 340 return true; |
329 end | 341 end |
330 | 342 |
331 function interface:setflags(r, w) | 343 function interface:setflags(r, w) |
332 if not(self._wantread or self._wantwrite) then | 344 if not(self._wantread or self._wantwrite) then |
430 -- Close, possibly after writing is done | 442 -- Close, possibly after writing is done |
431 function interface:close() | 443 function interface:close() |
432 if self.writebuffer and self.writebuffer[1] then | 444 if self.writebuffer and self.writebuffer[1] then |
433 self:set(false, true); -- Flush final buffer contents | 445 self:set(false, true); -- Flush final buffer contents |
434 self.write, self.send = noop, noop; -- No more writing | 446 self.write, self.send = noop, noop; -- No more writing |
435 log("debug", "Close %s after writing", self); | 447 self:debug("Close after writing"); |
436 self.ondrain = interface.close; | 448 self.ondrain = interface.close; |
437 else | 449 else |
438 log("debug", "Close %s now", self); | 450 self:debug("Closing now"); |
439 self.write, self.send = noop, noop; | 451 self.write, self.send = noop, noop; |
440 self.close = noop; | 452 self.close = noop; |
441 self:on("disconnect"); | 453 self:on("disconnect"); |
442 self:destroy(); | 454 self:destroy(); |
443 end | 455 end |
462 | 474 |
463 function interface:starttls(tls_ctx) | 475 function interface:starttls(tls_ctx) |
464 if tls_ctx then self.tls_ctx = tls_ctx; end | 476 if tls_ctx then self.tls_ctx = tls_ctx; end |
465 self.starttls = false; | 477 self.starttls = false; |
466 if self.writebuffer and self.writebuffer[1] then | 478 if self.writebuffer and self.writebuffer[1] then |
467 log("debug", "Start TLS on %s after write", self); | 479 self:debug("Start TLS after write"); |
468 self.ondrain = interface.starttls; | 480 self.ondrain = interface.starttls; |
469 self:set(nil, true); -- make sure wantwrite is set | 481 self:set(nil, true); -- make sure wantwrite is set |
470 else | 482 else |
471 if self.ondrain == interface.starttls then | 483 if self.ondrain == interface.starttls then |
472 self.ondrain = nil; | 484 self.ondrain = nil; |
473 end | 485 end |
474 self.onwritable = interface.tlshandskake; | 486 self.onwritable = interface.tlshandskake; |
475 self.onreadable = interface.tlshandskake; | 487 self.onreadable = interface.tlshandskake; |
476 self:set(true, true); | 488 self:set(true, true); |
477 log("debug", "Prepare to start TLS on %s", self); | 489 self:debug("Prepared to start TLS"); |
478 end | 490 end |
479 end | 491 end |
480 | 492 |
481 function interface:tlshandskake() | 493 function interface:tlshandskake() |
482 self:setwritetimeout(false); | 494 self:setwritetimeout(false); |
483 self:setreadtimeout(false); | 495 self:setreadtimeout(false); |
484 if not self._tls then | 496 if not self._tls then |
485 self._tls = true; | 497 self._tls = true; |
486 log("debug", "Start TLS on %s now", self); | 498 self:debug("Starting TLS now"); |
487 self:del(); | 499 self:del(); |
488 local ok, conn, err = pcall(luasec.wrap, self.conn, self.tls_ctx); | 500 local ok, conn, err = pcall(luasec.wrap, self.conn, self.tls_ctx); |
489 if not ok then | 501 if not ok then |
490 conn, err = ok, conn; | 502 conn, err = ok, conn; |
491 log("error", "Failed to initialize TLS: %s", err); | 503 self:debug("Failed to initialize TLS: %s", err); |
492 end | 504 end |
493 if not conn then | 505 if not conn then |
494 self:on("disconnect", err); | 506 self:on("disconnect", err); |
495 self:destroy(); | 507 self:destroy(); |
496 return conn, err; | 508 return conn, err; |
510 self.onreadable = interface.tlshandskake; | 522 self.onreadable = interface.tlshandskake; |
511 return self:init(); | 523 return self:init(); |
512 end | 524 end |
513 local ok, err = self.conn:dohandshake(); | 525 local ok, err = self.conn:dohandshake(); |
514 if ok then | 526 if ok then |
515 log("debug", "TLS handshake on %s complete", self); | 527 self:debug("TLS handshake complete"); |
516 self.onwritable = nil; | 528 self.onwritable = nil; |
517 self.onreadable = nil; | 529 self.onreadable = nil; |
518 self:on("status", "ssl-handshake-complete"); | 530 self:on("status", "ssl-handshake-complete"); |
519 self:setwritetimeout(); | 531 self:setwritetimeout(); |
520 self:set(true, true); | 532 self:set(true, true); |
521 elseif err == "wantread" then | 533 elseif err == "wantread" then |
522 log("debug", "TLS handshake on %s to wait until readable", self); | 534 self:debug("TLS handshake to wait until readable"); |
523 self:set(true, false); | 535 self:set(true, false); |
524 self:setreadtimeout(cfg.ssl_handshake_timeout); | 536 self:setreadtimeout(cfg.ssl_handshake_timeout); |
525 elseif err == "wantwrite" then | 537 elseif err == "wantwrite" then |
526 log("debug", "TLS handshake on %s to wait until writable", self); | 538 self:debug("TLS handshake to wait until writable"); |
527 self:set(false, true); | 539 self:set(false, true); |
528 self:setwritetimeout(cfg.ssl_handshake_timeout); | 540 self:setwritetimeout(cfg.ssl_handshake_timeout); |
529 else | 541 else |
530 log("debug", "TLS handshake error on %s: %s", self, err); | 542 self:debug("TLS handshake error: %s", err); |
531 self:on("disconnect", err); | 543 self:on("disconnect", err); |
532 self:destroy(); | 544 self:destroy(); |
533 end | 545 end |
534 end | 546 end |
535 | 547 |
542 listeners = listeners; | 554 listeners = listeners; |
543 read_size = read_size or (server and server.read_size); | 555 read_size = read_size or (server and server.read_size); |
544 writebuffer = {}; | 556 writebuffer = {}; |
545 tls_ctx = tls_ctx or (server and server.tls_ctx); | 557 tls_ctx = tls_ctx or (server and server.tls_ctx); |
546 tls_direct = server and server.tls_direct; | 558 tls_direct = server and server.tls_direct; |
559 log = logger.init(("conn%s"):format(new_id())); | |
547 }, interface_mt); | 560 }, interface_mt); |
548 | 561 |
549 conn:updatenames(); | 562 conn:updatenames(); |
550 return conn; | 563 return conn; |
551 end | 564 end |
565 -- A server interface has new incoming connections waiting | 578 -- A server interface has new incoming connections waiting |
566 -- This replaces the onreadable callback | 579 -- This replaces the onreadable callback |
567 function interface:onacceptable() | 580 function interface:onacceptable() |
568 local conn, err = self.conn:accept(); | 581 local conn, err = self.conn:accept(); |
569 if not conn then | 582 if not conn then |
570 log("debug", "Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval); | 583 self:debug("Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval); |
571 self:pausefor(cfg.accept_retry_interval); | 584 self:pausefor(cfg.accept_retry_interval); |
572 return; | 585 return; |
573 end | 586 end |
574 local client = wrapsocket(conn, self, nil, self.listeners); | 587 local client = wrapsocket(conn, self, nil, self.listeners); |
575 log("debug", "New connection %s", tostring(client)); | 588 client:debug("New connection %s on server %s", client, self); |
576 client:init(); | 589 client:init(); |
577 if self.tls_direct then | 590 if self.tls_direct then |
578 client:starttls(self.tls_ctx); | 591 client:starttls(self.tls_ctx); |
579 else | 592 else |
580 client:onconnect(); | 593 client:onconnect(); |
645 tls_ctx = config and config.tls_ctx; | 658 tls_ctx = config and config.tls_ctx; |
646 tls_direct = config and config.tls_direct; | 659 tls_direct = config and config.tls_direct; |
647 hosts = config and config.sni_hosts; | 660 hosts = config and config.sni_hosts; |
648 sockname = addr; | 661 sockname = addr; |
649 sockport = port; | 662 sockport = port; |
663 log = logger.init(("serv%s"):format(new_id())); | |
650 }, interface_mt); | 664 }, interface_mt); |
665 server:debug("Server %s created", server); | |
651 server:add(true, false); | 666 server:add(true, false); |
652 return server; | 667 return server; |
653 end | 668 end |
654 | 669 |
655 -- COMPAT | 670 -- COMPAT |
703 local ok, err = client:init(); | 718 local ok, err = client:init(); |
704 if not ok then return ok, err; end | 719 if not ok then return ok, err; end |
705 if tls_ctx then | 720 if tls_ctx then |
706 client:starttls(tls_ctx); | 721 client:starttls(tls_ctx); |
707 end | 722 end |
723 client:debug("Client %s created", client); | |
708 return client, conn; | 724 return client, conn; |
709 end | 725 end |
710 | 726 |
711 local function watchfd(fd, onreadable, onwritable) | 727 local function watchfd(fd, onreadable, onwritable) |
712 local conn = setmetatable({ | 728 local conn = setmetatable({ |
721 conn.getfd = function () | 737 conn.getfd = function () |
722 return fd; | 738 return fd; |
723 end; | 739 end; |
724 -- Otherwise it'll need to be something LuaSocket-compatible | 740 -- Otherwise it'll need to be something LuaSocket-compatible |
725 end | 741 end |
742 conn.log = logger.init(("fdwatch%s"):format(new_id())); | |
726 conn:add(onreadable, onwritable); | 743 conn:add(onreadable, onwritable); |
727 return conn; | 744 return conn; |
728 end; | 745 end; |
729 | 746 |
730 -- Dump all data from one connection into another | 747 -- Dump all data from one connection into another |
831 close = function (self) | 848 close = function (self) |
832 self:del(); | 849 self:del(); |
833 fds[fd] = nil; | 850 fds[fd] = nil; |
834 end; | 851 end; |
835 }, interface_mt); | 852 }, interface_mt); |
853 conn.log = logger.init(("fdwatch%d"):format(conn:getfd())); | |
836 local ok, err = conn:add(mode == "r" or mode == "rw", mode == "w" or mode == "rw"); | 854 local ok, err = conn:add(mode == "r" or mode == "rw", mode == "w" or mode == "rw"); |
837 if not ok then return ok, err; end | 855 if not ok then return ok, err; end |
838 return conn; | 856 return conn; |
839 end; | 857 end; |
840 }; | 858 }; |