Comparison

net/server_event.lua @ 11120:b2331f3dfeea

Merge 0.11->trunk
author Matthew Wild <mwild1@gmail.com>
date Wed, 30 Sep 2020 09:50:33 +0100
parent 11068:988ddd57e851
child 11741:dcf38ac6a38c
comparison
equal deleted inserted replaced
11119:68df52bf08d5 11120:b2331f3dfeea
163 self:_close() 163 self:_close()
164 debug( "fatal error while ssl wrapping:", err ) 164 debug( "fatal error while ssl wrapping:", err )
165 return false 165 return false
166 end 166 end
167 167
168 if self.conn.sni and self.servername then 168 if self.conn.sni then
169 self.conn:sni(self.servername); 169 if self.servername then
170 self.conn:sni(self.servername);
171 elseif self._server and type(self._server.hosts) == "table" and next(self._server.hosts) ~= nil then
172 self.conn:sni(self._server.hosts, true);
173 end
170 end 174 end
171 175
172 self.conn:settimeout( 0 ) -- set non blocking 176 self.conn:settimeout( 0 ) -- set non blocking
173 local handshakecallback = coroutine_wrap(function( event ) 177 local handshakecallback = coroutine_wrap(function( event )
174 local _, err 178 local _, err
256 return nointerface, noreading, nowriting 260 return nointerface, noreading, nowriting
257 end 261 end
258 262
259 --TODO: Deprecate 263 --TODO: Deprecate
260 function interface_mt:lock_read(switch) 264 function interface_mt:lock_read(switch)
265 log("warn", ":lock_read is deprecated, use :pasue() and :resume()");
261 if switch then 266 if switch then
262 return self:pause(); 267 return self:pause();
263 else 268 else
264 return self:resume(); 269 return self:resume();
265 end 270 end
275 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback 280 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback
276 return true; 281 return true;
277 end 282 end
278 end 283 end
279 284
285 function interface_mt:pause_writes()
286 return self:_lock(self.nointerface, self.noreading, true);
287 end
288
289 function interface_mt:resume_writes()
290 self:_lock(self.nointerface, self.noreading, false);
291 if self.writecallback and not self.eventwrite then
292 self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ); -- register callback
293 return true;
294 end
295 end
296
297
280 function interface_mt:counter(c) 298 function interface_mt:counter(c)
281 if c then 299 if c then
282 self._connections = self._connections + c 300 self._connections = self._connections + c
283 end 301 end
284 return self._connections 302 return self._connections
285 end 303 end
286 304
287 -- Public methods 305 -- Public methods
288 function interface_mt:write(data) 306 function interface_mt:write(data)
289 if self.nowriting then return nil, "locked" end 307 if self.nointerface then return nil, "locked"; end
290 --vdebug( "try to send data to client, id/data:", self.id, data ) 308 --vdebug( "try to send data to client, id/data:", self.id, data )
291 data = tostring( data ) 309 data = tostring( data )
292 local len = #data 310 local len = #data
293 local total = len + self.writebufferlen 311 local total = len + self.writebufferlen
294 if total > cfg.MAX_SEND_LENGTH then -- check buffer length 312 if total > cfg.MAX_SEND_LENGTH then -- check buffer length
296 debug( "error:", err ) -- to much, check your app 314 debug( "error:", err ) -- to much, check your app
297 return nil, err 315 return nil, err
298 end 316 end
299 t_insert(self.writebuffer, data) -- new buffer 317 t_insert(self.writebuffer, data) -- new buffer
300 self.writebufferlen = total 318 self.writebufferlen = total
301 if not self.eventwrite then -- register new write event 319 if not self.eventwrite and not self.nowriting then -- register new write event
302 --vdebug( "register new write event" ) 320 --vdebug( "register new write event" )
303 self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ) 321 self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT )
304 end 322 end
305 return true 323 return true
306 end 324 end
443 function interface_mt:ondisconnect() 461 function interface_mt:ondisconnect()
444 end 462 end
445 function interface_mt:ontimeout() 463 function interface_mt:ontimeout()
446 end 464 end
447 function interface_mt:onreadtimeout() 465 function interface_mt:onreadtimeout()
448 self.fatalerror = "timeout during receiving"
449 debug( "connection failed:", self.fatalerror )
450 self:_close()
451 self.eventread = nil
452 end 466 end
453 function interface_mt:ondrain() 467 function interface_mt:ondrain()
454 end 468 end
455 function interface_mt:ondetach() 469 function interface_mt:ondetach()
456 end 470 end
640 setmetatable(interface, interface_mt) 654 setmetatable(interface, interface_mt)
641 interfacelist[ interface ] = true -- add to interfacelist 655 interfacelist[ interface ] = true -- add to interfacelist
642 return interface 656 return interface
643 end 657 end
644 658
645 local function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface 659 local function handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- creates a server interface
646 debug "creating server interface..." 660 debug "creating server interface..."
647 local interface = { 661 local interface = {
648 _connections = 0; 662 _connections = 0;
649 663
650 type = "server"; 664 type = "server";
656 fatalerror = false; -- error message 670 fatalerror = false; -- error message
657 nointerface = true; -- lock/unlock parameter 671 nointerface = true; -- lock/unlock parameter
658 672
659 _ip = addr, _port = port, _pattern = pattern, 673 _ip = addr, _port = port, _pattern = pattern,
660 _sslctx = sslctx; 674 _sslctx = sslctx;
675 hosts = {};
661 } 676 }
662 interface.id = tostring(interface):match("%x+$"); 677 interface.id = tostring(interface):match("%x+$");
663 interface.readcallback = function( event ) -- server handler, called on incoming connections 678 interface.readcallback = function( event ) -- server handler, called on incoming connections
664 --vdebug( "server can accept, id/addr/port:", interface, addr, port ) 679 --vdebug( "server can accept, id/addr/port:", interface, addr, port )
665 if interface.fatalerror then 680 if interface.fatalerror then
675 else 690 else
676 return EV_READ -- accept again 691 return EV_READ -- accept again
677 end 692 end
678 end 693 end
679 --vdebug("max connection check ok, accepting...") 694 --vdebug("max connection check ok, accepting...")
695 -- luacheck: ignore 231/err
680 local client, err = server:accept() -- try to accept; TODO: check err 696 local client, err = server:accept() -- try to accept; TODO: check err
681 while client do 697 while client do
682 if interface._connections >= cfg.MAX_CONNECTIONS then 698 if interface._connections >= cfg.MAX_CONNECTIONS then
683 client:close( ) -- refuse connection 699 client:close( ) -- refuse connection
684 debug( "maximal connections reached, refuse client connection; accept delay:", delay ) 700 debug( "maximal connections reached, refuse client connection; accept delay:", delay )
686 end 702 end
687 local client_ip, client_port = client:getpeername( ) 703 local client_ip, client_port = client:getpeername( )
688 interface._connections = interface._connections + 1 -- increase connection count 704 interface._connections = interface._connections + 1 -- increase connection count
689 local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx ) 705 local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx )
690 --vdebug( "client id:", clientinterface, "startssl:", startssl ) 706 --vdebug( "client id:", clientinterface, "startssl:", startssl )
691 if has_luasec and sslctx then 707 if has_luasec and startssl then
692 clientinterface:starttls(sslctx, true) 708 clientinterface:starttls(sslctx, true)
693 else 709 else
694 clientinterface:_start_session( true ) 710 clientinterface:_start_session( true )
695 end 711 end
696 debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>"); 712 debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>");
705 interfacelist[ interface ] = true 721 interfacelist[ interface ] = true
706 interface:_start_session() 722 interface:_start_session()
707 return interface 723 return interface
708 end 724 end
709 725
710 local function addserver( addr, port, listener, pattern, sslctx, startssl ) -- TODO: check arguments 726 local function listen(addr, port, listener, config)
711 --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslctx or "nil", startssl or "nil") 727 config = config or {}
712 if sslctx and not has_luasec then 728 if config.sslctx and not has_luasec then
713 debug "fatal error: luasec not found" 729 debug "fatal error: luasec not found"
714 return nil, "luasec not found" 730 return nil, "luasec not found"
715 end 731 end
716 local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket 732 local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket
717 if not server then 733 if not server then
718 debug( "creating server socket on "..addr.." port "..port.." failed:", err ) 734 debug( "creating server socket on "..addr.." port "..port.." failed:", err )
719 return nil, err 735 return nil, err
720 end 736 end
721 local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler 737 local interface = handleserver( server, addr, port, config.read_size, listener, config.tls_ctx, config.tls_direct) -- new server handler
722 debug( "new server created with id:", tostring(interface)) 738 debug( "new server created with id:", tostring(interface))
723 return interface 739 return interface
740 end
741
742 local function addserver( addr, port, listener, pattern, sslctx ) -- TODO: check arguments
743 --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslctx or "nil", startssl or "nil")
744 return listen( addr, port, listener, {
745 read_size = pattern,
746 tls_ctx = sslctx,
747 tls_direct = not not sslctx,
748 });
724 end 749 end
725 750
726 local function wrapclient( client, ip, port, listeners, pattern, sslctx, extra ) 751 local function wrapclient( client, ip, port, listeners, pattern, sslctx, extra )
727 local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx, extra ) 752 local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx, extra )
728 interface:_start_connection(sslctx) 753 interface:_start_connection(sslctx)
754 return nil, err 779 return nil, err
755 end 780 end
756 client:settimeout( 0 ) -- set nonblocking 781 client:settimeout( 0 ) -- set nonblocking
757 local res, err = client:setpeername( addr, serverport ) -- connect 782 local res, err = client:setpeername( addr, serverport ) -- connect
758 if res or ( err == "timeout" ) then 783 if res or ( err == "timeout" ) then
784 -- luacheck: ignore 211/port
759 local ip, port = client:getsockname( ) 785 local ip, port = client:getsockname( )
760 local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx, extra ) 786 local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx, extra )
761 debug( "new connection id:", interface.id ) 787 debug( "new connection id:", interface.id )
762 return interface, err 788 return interface, err
763 else 789 else
881 link = link, 907 link = link,
882 event = levent, 908 event = levent,
883 event_base = base, 909 event_base = base,
884 addevent = newevent, 910 addevent = newevent,
885 addserver = addserver, 911 addserver = addserver,
912 listen = listen,
886 addclient = addclient, 913 addclient = addclient,
887 wrapclient = wrapclient, 914 wrapclient = wrapclient,
888 setquitting = setquitting, 915 setquitting = setquitting,
889 closeall = closeallservers, 916 closeall = closeallservers,
890 get_backend = get_backend, 917 get_backend = get_backend,