Software /
code /
prosody
Comparison
net/server_event.lua @ 6054:7a5ddbaf758d
Merge 0.9->0.10
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Wed, 02 Apr 2014 17:41:38 +0100 |
parent | 6047:4db0403bfc0d |
parent | 5776:bd0ff8ae98a8 |
child | 6055:596539a30e9b |
child | 6315:7a3e2f2d43fc |
comparison
equal
deleted
inserted
replaced
6053:2f93a04564b2 | 6054:7a5ddbaf758d |
---|---|
113 | 113 |
114 -- Client interface methods | 114 -- Client interface methods |
115 local interface_mt | 115 local interface_mt |
116 do | 116 do |
117 interface_mt = {}; interface_mt.__index = interface_mt; | 117 interface_mt = {}; interface_mt.__index = interface_mt; |
118 | 118 |
119 local addevent = base.addevent | 119 local addevent = base.addevent |
120 local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield | 120 local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield |
121 | 121 |
122 -- Private methods | 122 -- Private methods |
123 function interface_mt:_position(new_position) | 123 function interface_mt:_position(new_position) |
124 self.position = new_position or self.position | 124 self.position = new_position or self.position |
125 return self.position; | 125 return self.position; |
126 end | 126 end |
127 function interface_mt:_close() | 127 function interface_mt:_close() |
128 return self:_destroy(); | 128 return self:_destroy(); |
129 end | 129 end |
130 | 130 |
131 function interface_mt:_start_connection(plainssl) -- should be called from addclient | 131 function interface_mt:_start_connection(plainssl) -- should be called from addclient |
132 local callback = function( event ) | 132 local callback = function( event ) |
133 if EV_TIMEOUT == event then -- timeout during connection | 133 if EV_TIMEOUT == event then -- timeout during connection |
134 self.fatalerror = "connection timeout" | 134 self.fatalerror = "connection timeout" |
135 self:ontimeout() -- call timeout listener | 135 self:ontimeout() -- call timeout listener |
266 self.interface, self.readcallback = nil, nil | 266 self.interface, self.readcallback = nil, nil |
267 end | 267 end |
268 interfacelist( "delete", self ) | 268 interfacelist( "delete", self ) |
269 return true | 269 return true |
270 end | 270 end |
271 | 271 |
272 function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events | 272 function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events |
273 self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting | 273 self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting |
274 return nointerface, noreading, nowriting | 274 return nointerface, noreading, nowriting |
275 end | 275 end |
276 | 276 |
277 --TODO: Deprecate | 277 --TODO: Deprecate |
278 function interface_mt:lock_read(switch) | 278 function interface_mt:lock_read(switch) |
279 if switch then | 279 if switch then |
280 return self:pause(); | 280 return self:pause(); |
281 else | 281 else |
298 if c then | 298 if c then |
299 self._connections = self._connections + c | 299 self._connections = self._connections + c |
300 end | 300 end |
301 return self._connections | 301 return self._connections |
302 end | 302 end |
303 | 303 |
304 -- Public methods | 304 -- Public methods |
305 function interface_mt:write(data) | 305 function interface_mt:write(data) |
306 if self.nowriting then return nil, "locked" end | 306 if self.nowriting then return nil, "locked" end |
307 --vdebug( "try to send data to client, id/data:", self.id, data ) | 307 --vdebug( "try to send data to client, id/data:", self.id, data ) |
308 data = tostring( data ) | 308 data = tostring( data ) |
341 self:_lock( true ) | 341 self:_lock( true ) |
342 self:_close( 0 ) | 342 self:_close( 0 ) |
343 return true | 343 return true |
344 end | 344 end |
345 end | 345 end |
346 | 346 |
347 function interface_mt:socket() | 347 function interface_mt:socket() |
348 return self.conn | 348 return self.conn |
349 end | 349 end |
350 | 350 |
351 function interface_mt:server() | 351 function interface_mt:server() |
352 return self._server or self; | 352 return self._server or self; |
353 end | 353 end |
354 | 354 |
355 function interface_mt:port() | 355 function interface_mt:port() |
356 return self._port | 356 return self._port |
357 end | 357 end |
358 | 358 |
359 function interface_mt:serverport() | 359 function interface_mt:serverport() |
360 return self._serverport | 360 return self._serverport |
361 end | 361 end |
362 | 362 |
363 function interface_mt:ip() | 363 function interface_mt:ip() |
364 return self._ip | 364 return self._ip |
365 end | 365 end |
366 | 366 |
367 function interface_mt:ssl() | 367 function interface_mt:ssl() |
368 return self._usingssl | 368 return self._usingssl |
369 end | 369 end |
370 interface_mt.clientport = interface_mt.port -- COMPAT server_select | 370 interface_mt.clientport = interface_mt.port -- COMPAT server_select |
371 | 371 |
372 function interface_mt:type() | 372 function interface_mt:type() |
373 return self._type or "client" | 373 return self._type or "client" |
374 end | 374 end |
375 | 375 |
376 function interface_mt:connections() | 376 function interface_mt:connections() |
377 return self._connections | 377 return self._connections |
378 end | 378 end |
379 | 379 |
380 function interface_mt:address() | 380 function interface_mt:address() |
381 return self.addr | 381 return self.addr |
382 end | 382 end |
383 | 383 |
384 function interface_mt:set_sslctx(sslctx) | 384 function interface_mt:set_sslctx(sslctx) |
385 self._sslctx = sslctx; | 385 self._sslctx = sslctx; |
386 if sslctx then | 386 if sslctx then |
387 self.starttls = nil; -- use starttls() of interface_mt | 387 self.starttls = nil; -- use starttls() of interface_mt |
388 else | 388 else |
394 if pattern then | 394 if pattern then |
395 self._pattern = pattern; | 395 self._pattern = pattern; |
396 end | 396 end |
397 return self._pattern; | 397 return self._pattern; |
398 end | 398 end |
399 | 399 |
400 function interface_mt:set_send(new_send) | 400 function interface_mt:set_send(new_send) |
401 -- No-op, we always use the underlying connection's send | 401 -- No-op, we always use the underlying connection's send |
402 end | 402 end |
403 | 403 |
404 function interface_mt:starttls(sslctx, call_onconnect) | 404 function interface_mt:starttls(sslctx, call_onconnect) |
405 debug( "try to start ssl at client id:", self.id ) | 405 debug( "try to start ssl at client id:", self.id ) |
406 local err | 406 local err |
407 self._sslctx = sslctx; | 407 self._sslctx = sslctx; |
408 if self._usingssl then -- startssl was already called | 408 if self._usingssl then -- startssl was already called |
427 debug "ssl session delayed until writebuffer is empty..." | 427 debug "ssl session delayed until writebuffer is empty..." |
428 end | 428 end |
429 self.starttls = false; | 429 self.starttls = false; |
430 return true | 430 return true |
431 end | 431 end |
432 | 432 |
433 function interface_mt:setoption(option, value) | 433 function interface_mt:setoption(option, value) |
434 if self.conn.setoption then | 434 if self.conn.setoption then |
435 return self.conn:setoption(option, value); | 435 return self.conn:setoption(option, value); |
436 end | 436 end |
437 return false, "setoption not implemented"; | 437 return false, "setoption not implemented"; |
438 end | 438 end |
439 | 439 |
440 function interface_mt:setlistener(listener) | 440 function interface_mt:setlistener(listener) |
441 self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, self.onstatus | 441 self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, self.onreadtimeout, self.onstatus |
442 = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout, listener.onstatus; | 442 = listener.onconnect, listener.ondisconnect, listener.onincoming, |
443 end | 443 listener.ontimeout, listener.onreadtimeout, listener.onstatus; |
444 | 444 end |
445 | |
445 -- Stub handlers | 446 -- Stub handlers |
446 function interface_mt:onconnect() | 447 function interface_mt:onconnect() |
447 end | 448 end |
448 function interface_mt:onincoming() | 449 function interface_mt:onincoming() |
449 end | 450 end |
450 function interface_mt:ondisconnect() | 451 function interface_mt:ondisconnect() |
451 end | 452 end |
452 function interface_mt:ontimeout() | 453 function interface_mt:ontimeout() |
454 end | |
455 function interface_mt:onreadtimeout() | |
456 self.fatalerror = "timeout during receiving" | |
457 debug( "connection failed:", self.fatalerror ) | |
458 self:_close() | |
459 self.eventread = nil | |
453 end | 460 end |
454 function interface_mt:ondrain() | 461 function interface_mt:ondrain() |
455 end | 462 end |
456 function interface_mt:onstatus() | 463 function interface_mt:onstatus() |
457 end | 464 end |
476 receive = client.receive; | 483 receive = client.receive; |
477 onconnect = listener.onconnect; -- will be called when client disconnects | 484 onconnect = listener.onconnect; -- will be called when client disconnects |
478 ondisconnect = listener.ondisconnect; -- will be called when client disconnects | 485 ondisconnect = listener.ondisconnect; -- will be called when client disconnects |
479 onincoming = listener.onincoming; -- will be called when client sends data | 486 onincoming = listener.onincoming; -- will be called when client sends data |
480 ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs | 487 ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs |
488 onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs | |
481 ondrain = listener.ondrain; -- called when writebuffer is empty | 489 ondrain = listener.ondrain; -- called when writebuffer is empty |
482 onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS) | 490 onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS) |
483 eventread = false, eventwrite = false, eventclose = false, | 491 eventread = false, eventwrite = false, eventclose = false, |
484 eventhandshake = false, eventstarthandshake = false; -- event handler | 492 eventhandshake = false, eventstarthandshake = false; -- event handler |
485 eventconnect = false, eventsession = false; -- more event handler... | 493 eventconnect = false, eventsession = false; -- more event handler... |
490 readcallback = false; -- will be called on read events | 498 readcallback = false; -- will be called on read events |
491 nointerface = true; -- lock/unlock parameter of this interface | 499 nointerface = true; -- lock/unlock parameter of this interface |
492 noreading = false, nowriting = false; -- locks of the read/writecallback | 500 noreading = false, nowriting = false; -- locks of the read/writecallback |
493 startsslcallback = false; -- starting handshake callback | 501 startsslcallback = false; -- starting handshake callback |
494 position = false; -- position of client in interfacelist | 502 position = false; -- position of client in interfacelist |
495 | 503 |
496 -- Properties | 504 -- Properties |
497 _ip = ip, _port = port, _server = server, _pattern = pattern, | 505 _ip = ip, _port = port, _server = server, _pattern = pattern, |
498 _serverport = (server and server:port() or nil), | 506 _serverport = (server and server:port() or nil), |
499 _sslctx = sslctx; -- parameters | 507 _sslctx = sslctx; -- parameters |
500 _usingssl = false; -- client is using ssl; | 508 _usingssl = false; -- client is using ssl; |
566 interface.eventwrite = nil | 574 interface.eventwrite = nil |
567 return -1 | 575 return -1 |
568 end | 576 end |
569 end | 577 end |
570 end | 578 end |
571 | 579 |
572 interface.readcallback = function( event ) -- called on read events | 580 interface.readcallback = function( event ) -- called on read events |
573 --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) | 581 --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) |
574 if interface.noreading or interface.fatalerror then -- leave this event | 582 if interface.noreading or interface.fatalerror then -- leave this event |
575 --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) ) | 583 --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) ) |
576 interface.eventread = nil | 584 interface.eventread = nil |
577 return -1 | 585 return -1 |
578 end | 586 end |
579 if EV_TIMEOUT == event then -- took too long to get some data from client -> disconnect | 587 if EV_TIMEOUT == event and interface:onreadtimeout() ~= true then |
580 interface.fatalerror = "timeout during receiving" | 588 return -1 -- took too long to get some data from client -> disconnect |
581 debug( "connection failed:", interface.fatalerror ) | 589 end |
590 if interface._usingssl then -- handle luasec | |
591 if interface.eventwritetimeout then -- ok, in the past writecallback was regged | |
592 local ret = interface.writecallback( ) -- call it | |
593 --vdebug( "tried to write in readcallback, result:", tostring(ret) ) | |
594 end | |
595 if interface.eventreadtimeout then | |
596 interface.eventreadtimeout:close( ) | |
597 interface.eventreadtimeout = nil | |
598 end | |
599 end | |
600 local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" | |
601 --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) | |
602 buffer = buffer or part | |
603 if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length | |
604 interface.fatalerror = "receive buffer exceeded" | |
605 debug( "fatal error:", interface.fatalerror ) | |
582 interface:_close() | 606 interface:_close() |
583 interface.eventread = nil | 607 interface.eventread = nil |
584 return -1 | 608 return -1 |
585 else -- can read | 609 end |
586 if interface._usingssl then -- handle luasec | 610 if err and ( err ~= "timeout" and err ~= "wantread" ) then |
587 if interface.eventwritetimeout then -- ok, in the past writecallback was regged | 611 if "wantwrite" == err then -- need to read on write event |
588 local ret = interface.writecallback( ) -- call it | 612 if not interface.eventwrite then -- register new write event if needed |
589 --vdebug( "tried to write in readcallback, result:", tostring(ret) ) | 613 interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) |
590 end | 614 end |
591 if interface.eventreadtimeout then | 615 interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, |
592 interface.eventreadtimeout:close( ) | 616 function( ) |
593 interface.eventreadtimeout = nil | 617 interface:_close() |
594 end | 618 end, cfg.READ_TIMEOUT |
595 end | 619 ) |
596 local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" | 620 debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." ) |
597 --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) | 621 -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... |
598 buffer = buffer or part | 622 else -- connection was closed or fatal error |
599 if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length | 623 interface.fatalerror = err |
600 interface.fatalerror = "receive buffer exceeded" | 624 debug( "connection failed in read event:", interface.fatalerror ) |
601 debug( "fatal error:", interface.fatalerror ) | |
602 interface:_close() | 625 interface:_close() |
603 interface.eventread = nil | 626 interface.eventread = nil |
604 return -1 | 627 return -1 |
605 end | 628 end |
606 if err and ( err ~= "timeout" and err ~= "wantread" ) then | 629 else |
607 if "wantwrite" == err then -- need to read on write event | 630 interface.onincoming( interface, buffer, err ) -- send new data to listener |
608 if not interface.eventwrite then -- register new write event if needed | 631 end |
609 interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) | 632 if interface.noreading then |
610 end | 633 interface.eventread = nil; |
611 interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, | 634 return -1; |
612 function( ) | 635 end |
613 interface:_close() | 636 return EV_READ, cfg.READ_TIMEOUT |
614 end, cfg.READ_TIMEOUT | |
615 ) | |
616 debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." ) | |
617 -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... | |
618 else -- connection was closed or fatal error | |
619 interface.fatalerror = err | |
620 debug( "connection failed in read event:", interface.fatalerror ) | |
621 interface:_close() | |
622 interface.eventread = nil | |
623 return -1 | |
624 end | |
625 else | |
626 interface.onincoming( interface, buffer, err ) -- send new data to listener | |
627 end | |
628 if interface.noreading then | |
629 interface.eventread = nil; | |
630 return -1; | |
631 end | |
632 return EV_READ, cfg.READ_TIMEOUT | |
633 end | |
634 end | 637 end |
635 | 638 |
636 client:settimeout( 0 ) -- set non blocking | 639 client:settimeout( 0 ) -- set non blocking |
637 setmetatable(interface, interface_mt) | 640 setmetatable(interface, interface_mt) |
638 interfacelist( "add", interface ) -- add to interfacelist | 641 interfacelist( "add", interface ) -- add to interfacelist |
644 do | 647 do |
645 function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface | 648 function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface |
646 debug "creating server interface..." | 649 debug "creating server interface..." |
647 local interface = { | 650 local interface = { |
648 _connections = 0; | 651 _connections = 0; |
649 | 652 |
650 conn = server; | 653 conn = server; |
651 onconnect = listener.onconnect; -- will be called when new client connected | 654 onconnect = listener.onconnect; -- will be called when new client connected |
652 eventread = false; -- read event handler | 655 eventread = false; -- read event handler |
653 eventclose = false; -- close event handler | 656 eventclose = false; -- close event handler |
654 readcallback = false; -- read event callback | 657 readcallback = false; -- read event callback |
655 fatalerror = false; -- error message | 658 fatalerror = false; -- error message |
656 nointerface = true; -- lock/unlock parameter | 659 nointerface = true; -- lock/unlock parameter |
657 | 660 |
658 _ip = addr, _port = port, _pattern = pattern, | 661 _ip = addr, _port = port, _pattern = pattern, |
659 _sslctx = sslctx; | 662 _sslctx = sslctx; |
660 } | 663 } |
661 interface.id = tostring(interface):match("%x+$"); | 664 interface.id = tostring(interface):match("%x+$"); |
662 interface.readcallback = function( event ) -- server handler, called on incoming connections | 665 interface.readcallback = function( event ) -- server handler, called on incoming connections |
691 clientinterface:starttls(sslctx, true) | 694 clientinterface:starttls(sslctx, true) |
692 else | 695 else |
693 clientinterface:_start_session( true ) | 696 clientinterface:_start_session( true ) |
694 end | 697 end |
695 debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>"); | 698 debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>"); |
696 | 699 |
697 client, err = server:accept() -- try to accept again | 700 client, err = server:accept() -- try to accept again |
698 end | 701 end |
699 return EV_READ | 702 return EV_READ |
700 end | 703 end |
701 | 704 |
702 server:settimeout( 0 ) | 705 server:settimeout( 0 ) |
703 setmetatable(interface, interface_mt) | 706 setmetatable(interface, interface_mt) |
704 interfacelist( "add", interface ) | 707 interfacelist( "add", interface ) |
705 interface:_start_session() | 708 interface:_start_session() |
706 return interface | 709 return interface |
739 local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) | 742 local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) |
740 interface:_start_connection(sslctx) | 743 interface:_start_connection(sslctx) |
741 return interface, client | 744 return interface, client |
742 --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface | 745 --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface |
743 end | 746 end |
744 | 747 |
745 function addclient( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl ) | 748 function addclient( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl ) |
746 local client, err = socket.tcp() -- creating new socket | 749 local client, err = socket.tcp() -- creating new socket |
747 if not client then | 750 if not client then |
748 debug( "cannot create socket:", err ) | 751 debug( "cannot create socket:", err ) |
749 return nil, err | 752 return nil, err |
830 return signal_events[signal_num]; | 833 return signal_events[signal_num]; |
831 end | 834 end |
832 | 835 |
833 local function link(sender, receiver, buffersize) | 836 local function link(sender, receiver, buffersize) |
834 local sender_locked; | 837 local sender_locked; |
835 | 838 |
836 function receiver:ondrain() | 839 function receiver:ondrain() |
837 if sender_locked then | 840 if sender_locked then |
838 sender:resume(); | 841 sender:resume(); |
839 sender_locked = nil; | 842 sender_locked = nil; |
840 end | 843 end |
841 end | 844 end |
842 | 845 |
843 function sender:onincoming(data) | 846 function sender:onincoming(data) |
844 receiver:write(data); | 847 receiver:write(data); |
845 if receiver.writebufferlen >= buffersize then | 848 if receiver.writebufferlen >= buffersize then |
846 sender_locked = true; | 849 sender_locked = true; |
847 sender:pause(); | 850 sender:pause(); |