Comparison

net/server_event.lua @ 6054:7a5ddbaf758d

Merge 0.9->0.10
author Matthew Wild <mwild1@gmail.com>
date Wed, 02 Apr 2014 17:41:38 +0100
parent 6047:4db0403bfc0d
parent 5776:bd0ff8ae98a8
child 6055:596539a30e9b
child 6315:7a3e2f2d43fc
comparison
equal deleted inserted replaced
6053:2f93a04564b2 6054:7a5ddbaf758d
113 113
114 -- Client interface methods 114 -- Client interface methods
115 local interface_mt 115 local interface_mt
116 do 116 do
117 interface_mt = {}; interface_mt.__index = interface_mt; 117 interface_mt = {}; interface_mt.__index = interface_mt;
118 118
119 local addevent = base.addevent 119 local addevent = base.addevent
120 local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield 120 local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield
121 121
122 -- Private methods 122 -- Private methods
123 function interface_mt:_position(new_position) 123 function interface_mt:_position(new_position)
124 self.position = new_position or self.position 124 self.position = new_position or self.position
125 return self.position; 125 return self.position;
126 end 126 end
127 function interface_mt:_close() 127 function interface_mt:_close()
128 return self:_destroy(); 128 return self:_destroy();
129 end 129 end
130 130
131 function interface_mt:_start_connection(plainssl) -- should be called from addclient 131 function interface_mt:_start_connection(plainssl) -- should be called from addclient
132 local callback = function( event ) 132 local callback = function( event )
133 if EV_TIMEOUT == event then -- timeout during connection 133 if EV_TIMEOUT == event then -- timeout during connection
134 self.fatalerror = "connection timeout" 134 self.fatalerror = "connection timeout"
135 self:ontimeout() -- call timeout listener 135 self:ontimeout() -- call timeout listener
266 self.interface, self.readcallback = nil, nil 266 self.interface, self.readcallback = nil, nil
267 end 267 end
268 interfacelist( "delete", self ) 268 interfacelist( "delete", self )
269 return true 269 return true
270 end 270 end
271 271
272 function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events 272 function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events
273 self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting 273 self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting
274 return nointerface, noreading, nowriting 274 return nointerface, noreading, nowriting
275 end 275 end
276 276
277 --TODO: Deprecate 277 --TODO: Deprecate
278 function interface_mt:lock_read(switch) 278 function interface_mt:lock_read(switch)
279 if switch then 279 if switch then
280 return self:pause(); 280 return self:pause();
281 else 281 else
298 if c then 298 if c then
299 self._connections = self._connections + c 299 self._connections = self._connections + c
300 end 300 end
301 return self._connections 301 return self._connections
302 end 302 end
303 303
304 -- Public methods 304 -- Public methods
305 function interface_mt:write(data) 305 function interface_mt:write(data)
306 if self.nowriting then return nil, "locked" end 306 if self.nowriting then return nil, "locked" end
307 --vdebug( "try to send data to client, id/data:", self.id, data ) 307 --vdebug( "try to send data to client, id/data:", self.id, data )
308 data = tostring( data ) 308 data = tostring( data )
341 self:_lock( true ) 341 self:_lock( true )
342 self:_close( 0 ) 342 self:_close( 0 )
343 return true 343 return true
344 end 344 end
345 end 345 end
346 346
347 function interface_mt:socket() 347 function interface_mt:socket()
348 return self.conn 348 return self.conn
349 end 349 end
350 350
351 function interface_mt:server() 351 function interface_mt:server()
352 return self._server or self; 352 return self._server or self;
353 end 353 end
354 354
355 function interface_mt:port() 355 function interface_mt:port()
356 return self._port 356 return self._port
357 end 357 end
358 358
359 function interface_mt:serverport() 359 function interface_mt:serverport()
360 return self._serverport 360 return self._serverport
361 end 361 end
362 362
363 function interface_mt:ip() 363 function interface_mt:ip()
364 return self._ip 364 return self._ip
365 end 365 end
366 366
367 function interface_mt:ssl() 367 function interface_mt:ssl()
368 return self._usingssl 368 return self._usingssl
369 end 369 end
370 interface_mt.clientport = interface_mt.port -- COMPAT server_select 370 interface_mt.clientport = interface_mt.port -- COMPAT server_select
371 371
372 function interface_mt:type() 372 function interface_mt:type()
373 return self._type or "client" 373 return self._type or "client"
374 end 374 end
375 375
376 function interface_mt:connections() 376 function interface_mt:connections()
377 return self._connections 377 return self._connections
378 end 378 end
379 379
380 function interface_mt:address() 380 function interface_mt:address()
381 return self.addr 381 return self.addr
382 end 382 end
383 383
384 function interface_mt:set_sslctx(sslctx) 384 function interface_mt:set_sslctx(sslctx)
385 self._sslctx = sslctx; 385 self._sslctx = sslctx;
386 if sslctx then 386 if sslctx then
387 self.starttls = nil; -- use starttls() of interface_mt 387 self.starttls = nil; -- use starttls() of interface_mt
388 else 388 else
394 if pattern then 394 if pattern then
395 self._pattern = pattern; 395 self._pattern = pattern;
396 end 396 end
397 return self._pattern; 397 return self._pattern;
398 end 398 end
399 399
400 function interface_mt:set_send(new_send) 400 function interface_mt:set_send(new_send)
401 -- No-op, we always use the underlying connection's send 401 -- No-op, we always use the underlying connection's send
402 end 402 end
403 403
404 function interface_mt:starttls(sslctx, call_onconnect) 404 function interface_mt:starttls(sslctx, call_onconnect)
405 debug( "try to start ssl at client id:", self.id ) 405 debug( "try to start ssl at client id:", self.id )
406 local err 406 local err
407 self._sslctx = sslctx; 407 self._sslctx = sslctx;
408 if self._usingssl then -- startssl was already called 408 if self._usingssl then -- startssl was already called
427 debug "ssl session delayed until writebuffer is empty..." 427 debug "ssl session delayed until writebuffer is empty..."
428 end 428 end
429 self.starttls = false; 429 self.starttls = false;
430 return true 430 return true
431 end 431 end
432 432
433 function interface_mt:setoption(option, value) 433 function interface_mt:setoption(option, value)
434 if self.conn.setoption then 434 if self.conn.setoption then
435 return self.conn:setoption(option, value); 435 return self.conn:setoption(option, value);
436 end 436 end
437 return false, "setoption not implemented"; 437 return false, "setoption not implemented";
438 end 438 end
439 439
440 function interface_mt:setlistener(listener) 440 function interface_mt:setlistener(listener)
441 self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, self.onstatus 441 self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, self.onreadtimeout, self.onstatus
442 = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout, listener.onstatus; 442 = listener.onconnect, listener.ondisconnect, listener.onincoming,
443 end 443 listener.ontimeout, listener.onreadtimeout, listener.onstatus;
444 444 end
445
445 -- Stub handlers 446 -- Stub handlers
446 function interface_mt:onconnect() 447 function interface_mt:onconnect()
447 end 448 end
448 function interface_mt:onincoming() 449 function interface_mt:onincoming()
449 end 450 end
450 function interface_mt:ondisconnect() 451 function interface_mt:ondisconnect()
451 end 452 end
452 function interface_mt:ontimeout() 453 function interface_mt:ontimeout()
454 end
455 function interface_mt:onreadtimeout()
456 self.fatalerror = "timeout during receiving"
457 debug( "connection failed:", self.fatalerror )
458 self:_close()
459 self.eventread = nil
453 end 460 end
454 function interface_mt:ondrain() 461 function interface_mt:ondrain()
455 end 462 end
456 function interface_mt:onstatus() 463 function interface_mt:onstatus()
457 end 464 end
476 receive = client.receive; 483 receive = client.receive;
477 onconnect = listener.onconnect; -- will be called when client disconnects 484 onconnect = listener.onconnect; -- will be called when client disconnects
478 ondisconnect = listener.ondisconnect; -- will be called when client disconnects 485 ondisconnect = listener.ondisconnect; -- will be called when client disconnects
479 onincoming = listener.onincoming; -- will be called when client sends data 486 onincoming = listener.onincoming; -- will be called when client sends data
480 ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs 487 ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs
488 onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs
481 ondrain = listener.ondrain; -- called when writebuffer is empty 489 ondrain = listener.ondrain; -- called when writebuffer is empty
482 onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS) 490 onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS)
483 eventread = false, eventwrite = false, eventclose = false, 491 eventread = false, eventwrite = false, eventclose = false,
484 eventhandshake = false, eventstarthandshake = false; -- event handler 492 eventhandshake = false, eventstarthandshake = false; -- event handler
485 eventconnect = false, eventsession = false; -- more event handler... 493 eventconnect = false, eventsession = false; -- more event handler...
490 readcallback = false; -- will be called on read events 498 readcallback = false; -- will be called on read events
491 nointerface = true; -- lock/unlock parameter of this interface 499 nointerface = true; -- lock/unlock parameter of this interface
492 noreading = false, nowriting = false; -- locks of the read/writecallback 500 noreading = false, nowriting = false; -- locks of the read/writecallback
493 startsslcallback = false; -- starting handshake callback 501 startsslcallback = false; -- starting handshake callback
494 position = false; -- position of client in interfacelist 502 position = false; -- position of client in interfacelist
495 503
496 -- Properties 504 -- Properties
497 _ip = ip, _port = port, _server = server, _pattern = pattern, 505 _ip = ip, _port = port, _server = server, _pattern = pattern,
498 _serverport = (server and server:port() or nil), 506 _serverport = (server and server:port() or nil),
499 _sslctx = sslctx; -- parameters 507 _sslctx = sslctx; -- parameters
500 _usingssl = false; -- client is using ssl; 508 _usingssl = false; -- client is using ssl;
566 interface.eventwrite = nil 574 interface.eventwrite = nil
567 return -1 575 return -1
568 end 576 end
569 end 577 end
570 end 578 end
571 579
572 interface.readcallback = function( event ) -- called on read events 580 interface.readcallback = function( event ) -- called on read events
573 --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) 581 --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) )
574 if interface.noreading or interface.fatalerror then -- leave this event 582 if interface.noreading or interface.fatalerror then -- leave this event
575 --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) ) 583 --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) )
576 interface.eventread = nil 584 interface.eventread = nil
577 return -1 585 return -1
578 end 586 end
579 if EV_TIMEOUT == event then -- took too long to get some data from client -> disconnect 587 if EV_TIMEOUT == event and interface:onreadtimeout() ~= true then
580 interface.fatalerror = "timeout during receiving" 588 return -1 -- took too long to get some data from client -> disconnect
581 debug( "connection failed:", interface.fatalerror ) 589 end
590 if interface._usingssl then -- handle luasec
591 if interface.eventwritetimeout then -- ok, in the past writecallback was regged
592 local ret = interface.writecallback( ) -- call it
593 --vdebug( "tried to write in readcallback, result:", tostring(ret) )
594 end
595 if interface.eventreadtimeout then
596 interface.eventreadtimeout:close( )
597 interface.eventreadtimeout = nil
598 end
599 end
600 local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern"
601 --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) )
602 buffer = buffer or part
603 if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length
604 interface.fatalerror = "receive buffer exceeded"
605 debug( "fatal error:", interface.fatalerror )
582 interface:_close() 606 interface:_close()
583 interface.eventread = nil 607 interface.eventread = nil
584 return -1 608 return -1
585 else -- can read 609 end
586 if interface._usingssl then -- handle luasec 610 if err and ( err ~= "timeout" and err ~= "wantread" ) then
587 if interface.eventwritetimeout then -- ok, in the past writecallback was regged 611 if "wantwrite" == err then -- need to read on write event
588 local ret = interface.writecallback( ) -- call it 612 if not interface.eventwrite then -- register new write event if needed
589 --vdebug( "tried to write in readcallback, result:", tostring(ret) ) 613 interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT )
590 end 614 end
591 if interface.eventreadtimeout then 615 interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT,
592 interface.eventreadtimeout:close( ) 616 function( )
593 interface.eventreadtimeout = nil 617 interface:_close()
594 end 618 end, cfg.READ_TIMEOUT
595 end 619 )
596 local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" 620 debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." )
597 --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) 621 -- to be honest i dont know what happens next, if it is allowed to first read, the write etc...
598 buffer = buffer or part 622 else -- connection was closed or fatal error
599 if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length 623 interface.fatalerror = err
600 interface.fatalerror = "receive buffer exceeded" 624 debug( "connection failed in read event:", interface.fatalerror )
601 debug( "fatal error:", interface.fatalerror )
602 interface:_close() 625 interface:_close()
603 interface.eventread = nil 626 interface.eventread = nil
604 return -1 627 return -1
605 end 628 end
606 if err and ( err ~= "timeout" and err ~= "wantread" ) then 629 else
607 if "wantwrite" == err then -- need to read on write event 630 interface.onincoming( interface, buffer, err ) -- send new data to listener
608 if not interface.eventwrite then -- register new write event if needed 631 end
609 interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) 632 if interface.noreading then
610 end 633 interface.eventread = nil;
611 interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, 634 return -1;
612 function( ) 635 end
613 interface:_close() 636 return EV_READ, cfg.READ_TIMEOUT
614 end, cfg.READ_TIMEOUT
615 )
616 debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." )
617 -- to be honest i dont know what happens next, if it is allowed to first read, the write etc...
618 else -- connection was closed or fatal error
619 interface.fatalerror = err
620 debug( "connection failed in read event:", interface.fatalerror )
621 interface:_close()
622 interface.eventread = nil
623 return -1
624 end
625 else
626 interface.onincoming( interface, buffer, err ) -- send new data to listener
627 end
628 if interface.noreading then
629 interface.eventread = nil;
630 return -1;
631 end
632 return EV_READ, cfg.READ_TIMEOUT
633 end
634 end 637 end
635 638
636 client:settimeout( 0 ) -- set non blocking 639 client:settimeout( 0 ) -- set non blocking
637 setmetatable(interface, interface_mt) 640 setmetatable(interface, interface_mt)
638 interfacelist( "add", interface ) -- add to interfacelist 641 interfacelist( "add", interface ) -- add to interfacelist
644 do 647 do
645 function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface 648 function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface
646 debug "creating server interface..." 649 debug "creating server interface..."
647 local interface = { 650 local interface = {
648 _connections = 0; 651 _connections = 0;
649 652
650 conn = server; 653 conn = server;
651 onconnect = listener.onconnect; -- will be called when new client connected 654 onconnect = listener.onconnect; -- will be called when new client connected
652 eventread = false; -- read event handler 655 eventread = false; -- read event handler
653 eventclose = false; -- close event handler 656 eventclose = false; -- close event handler
654 readcallback = false; -- read event callback 657 readcallback = false; -- read event callback
655 fatalerror = false; -- error message 658 fatalerror = false; -- error message
656 nointerface = true; -- lock/unlock parameter 659 nointerface = true; -- lock/unlock parameter
657 660
658 _ip = addr, _port = port, _pattern = pattern, 661 _ip = addr, _port = port, _pattern = pattern,
659 _sslctx = sslctx; 662 _sslctx = sslctx;
660 } 663 }
661 interface.id = tostring(interface):match("%x+$"); 664 interface.id = tostring(interface):match("%x+$");
662 interface.readcallback = function( event ) -- server handler, called on incoming connections 665 interface.readcallback = function( event ) -- server handler, called on incoming connections
691 clientinterface:starttls(sslctx, true) 694 clientinterface:starttls(sslctx, true)
692 else 695 else
693 clientinterface:_start_session( true ) 696 clientinterface:_start_session( true )
694 end 697 end
695 debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>"); 698 debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>");
696 699
697 client, err = server:accept() -- try to accept again 700 client, err = server:accept() -- try to accept again
698 end 701 end
699 return EV_READ 702 return EV_READ
700 end 703 end
701 704
702 server:settimeout( 0 ) 705 server:settimeout( 0 )
703 setmetatable(interface, interface_mt) 706 setmetatable(interface, interface_mt)
704 interfacelist( "add", interface ) 707 interfacelist( "add", interface )
705 interface:_start_session() 708 interface:_start_session()
706 return interface 709 return interface
739 local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) 742 local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx )
740 interface:_start_connection(sslctx) 743 interface:_start_connection(sslctx)
741 return interface, client 744 return interface, client
742 --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface 745 --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface
743 end 746 end
744 747
745 function addclient( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl ) 748 function addclient( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl )
746 local client, err = socket.tcp() -- creating new socket 749 local client, err = socket.tcp() -- creating new socket
747 if not client then 750 if not client then
748 debug( "cannot create socket:", err ) 751 debug( "cannot create socket:", err )
749 return nil, err 752 return nil, err
830 return signal_events[signal_num]; 833 return signal_events[signal_num];
831 end 834 end
832 835
833 local function link(sender, receiver, buffersize) 836 local function link(sender, receiver, buffersize)
834 local sender_locked; 837 local sender_locked;
835 838
836 function receiver:ondrain() 839 function receiver:ondrain()
837 if sender_locked then 840 if sender_locked then
838 sender:resume(); 841 sender:resume();
839 sender_locked = nil; 842 sender_locked = nil;
840 end 843 end
841 end 844 end
842 845
843 function sender:onincoming(data) 846 function sender:onincoming(data)
844 receiver:write(data); 847 receiver:write(data);
845 if receiver.writebufferlen >= buffersize then 848 if receiver.writebufferlen >= buffersize then
846 sender_locked = true; 849 sender_locked = true;
847 sender:pause(); 850 sender:pause();