Software /
code /
prosody
Changeset
2318:a831de056de3
net.server_select, net.server_event: Convert to unix line endings from Windows
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Fri, 04 Dec 2009 03:07:17 +0000 |
parents | 2317:aa58136622bc |
children | 2319:8dca45e17438 |
files | net/server_event.lua net/server_select.lua |
diffstat | 2 files changed, 1707 insertions(+), 1707 deletions(-) [+] |
line wrap: on
line diff
--- a/net/server_event.lua Fri Dec 04 03:02:25 2009 +0000 +++ b/net/server_event.lua Fri Dec 04 03:07:17 2009 +0000 @@ -1,786 +1,786 @@ ---[[ - - - server.lua based on lua/libevent by blastbeat - - notes: - -- when using luaevent, never register 2 or more EV_READ at one socket, same for EV_WRITE - -- you cant even register a new EV_READ/EV_WRITE callback inside another one - -- never call eventcallback:close( ) from inside eventcallback - -- to do some of the above, use timeout events or something what will called from outside - -- dont let garbagecollect eventcallbacks, as long they are running - -- when using luasec, there are 4 cases of timeout errors: wantread or wantwrite during reading or writing - ---]] - - -local SCRIPT_NAME = "server_event.lua" -local SCRIPT_VERSION = "0.05" -local SCRIPT_AUTHOR = "blastbeat" -local LAST_MODIFIED = "2009/11/20" - -local cfg = { - MAX_CONNECTIONS = 100000, -- max per server connections (use "ulimit -n" on *nix) - MAX_HANDSHAKE_ATTEMPS = 10, -- attemps to finish ssl handshake - HANDSHAKE_TIMEOUT = 1, -- timout in seconds per handshake attemp - MAX_READ_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes allowed to read from sockets - MAX_SEND_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes size of write buffer (for writing on sockets) - ACCEPT_DELAY = 10, -- seconds to wait until the next attemp of a full server to accept - READ_TIMEOUT = 60 * 30, -- timeout in seconds for read data from socket - WRITE_TIMEOUT = 30, -- timeout in seconds for write data on socket - CONNECT_TIMEOUT = 10, -- timeout in seconds for connection attemps - CLEAR_DELAY = 5, -- seconds to wait for clearing interface list (and calling ondisconnect listeners) - DEBUG = true, -- show debug messages -} - -local function use(x) return rawget(_G, x); end -local print = use "print" -local pcall = use "pcall" -local ipairs = use "ipairs" -local string = use "string" -local select = use "select" -local require = use "require" -local tostring = use "tostring" -local coroutine = use "coroutine" -local setmetatable = use "setmetatable" - -local ssl = use "ssl" -local socket = use "socket" - -local log = require ("util.logger").init("socket") - -local function debug(...) - return log("debug", ("%s "):rep(select('#', ...)), ...) -end -local vdebug = debug; - -local bitor = ( function( ) -- thx Rici Lake - local hasbit = function( x, p ) - return x % ( p + p ) >= p - end - return function( x, y ) - local p = 1 - local z = 0 - local limit = x > y and x or y - while p <= limit do - if hasbit( x, p ) or hasbit( y, p ) then - z = z + p - end - p = p + p - end - return z - end -end )( ) - -local event = require "luaevent.core" -local base = event.new( ) -local EV_READ = event.EV_READ -local EV_WRITE = event.EV_WRITE -local EV_TIMEOUT = event.EV_TIMEOUT - -local EV_READWRITE = bitor( EV_READ, EV_WRITE ) - -local interfacelist = ( function( ) -- holds the interfaces for sockets - local array = { } - local len = 0 - return function( method, arg ) - if "add" == method then - len = len + 1 - array[ len ] = arg - arg:_position( len ) - return len - elseif "delete" == method then - if len <= 0 then - return nil, "array is already empty" - end - local position = arg:_position() -- get position in array - if position ~= len then - local interface = array[ len ] -- get last interface - array[ position ] = interface -- copy it into free position - array[ len ] = nil -- free last position - interface:_position( position ) -- set new position in array - else -- free last position - array[ len ] = nil - end - len = len - 1 - return len - else - return array - end - end -end )( ) - --- Client interface methods -local interface_mt -do - interface_mt = {}; interface_mt.__index = interface_mt; - - local addevent = base.addevent - local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield - local string_len = string.len - - -- Private methods - function interface_mt:_position(new_position) - self.position = new_position or self.position - return self.position; - end - function interface_mt:_close() -- regs event to start self:_destroy() - local callback = function( ) - self:_destroy(); - self.eventclose = nil - return -1 - end - self.eventclose = addevent( base, nil, EV_TIMEOUT, callback, 0 ) - return true - end - - function interface_mt:_start_connection(plainssl) -- should be called from addclient - local callback = function( event ) - if EV_TIMEOUT == event then -- timout during connection - self.fatalerror = "connection timeout" - self:ontimeout() -- call timeout listener - self:_close() - debug( "new connection failed. id:", self.id, "error:", self.fatalerror ) - else - if plainssl then -- start ssl session - self:_start_ssl( self.listener.onconnect ) - else -- normal connection - self:_start_session( self.listener.onconnect ) - end - debug( "new connection established. id:", self.id ) - end - self.eventconnect = nil - return -1 - end - self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT ) - return true - end - function interface_mt:_start_session(onconnect) -- new session, for example after startssl - if self.type == "client" then - local callback = function( ) - self:_lock( false, false, false ) - --vdebug( "start listening on client socket with id:", self.id ) - self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ) -- register callback - self:onconnect() - self.eventsession = nil - return -1 - end - self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 ) - else - self:_lock( false ) - --vdebug( "start listening on server socket with id:", self.id ) - self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback - end - return true - end - function interface_mt:_start_ssl(arg) -- old socket will be destroyed, therefore we have to close read/write events first - --vdebug( "starting ssl session with client id:", self.id ) - local _ - _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! - _ = self.eventwrite and self.eventwrite:close( ) - self.eventread, self.eventwrite = nil, nil - local err - self.conn, err = ssl.wrap( self.conn, self._sslctx ) - if err then - self.fatalerror = err - self.conn = nil -- cannot be used anymore - if "onconnect" == arg then - self.ondisconnect = nil -- dont call this when client isnt really connected - end - self:_close() - debug( "fatal error while ssl wrapping:", err ) - return false - end - self.conn:settimeout( 0 ) -- set non blocking - local handshakecallback = coroutine_wrap( - function( event ) - local _, err - local attempt = 0 - local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPS - while attempt < 1000 do -- no endless loop - attempt = attempt + 1 - debug( "ssl handshake of client with id:"..tostring(self).."attemp:"..attempt ) - if attempt > maxattempt then - self.fatalerror = "max handshake attemps exceeded" - elseif EV_TIMEOUT == event then - self.fatalerror = "timeout during handshake" - else - _, err = self.conn:dohandshake( ) - if not err then - self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed - self.send = self.conn.send -- caching table lookups with new client object - self.receive = self.conn.receive - local onsomething - if "onconnect" == arg then -- trigger listener - onsomething = self.onconnect - else - onsomething = self.onsslconnection - end - self:_start_session( onsomething ) - debug( "ssl handshake done" ) - self.eventhandshake = nil - return -1 - end - debug( "error during ssl handshake:", err ) - if err == "wantwrite" then - event = EV_WRITE - elseif err == "wantread" then - event = EV_READ - else - self.fatalerror = err - end - end - if self.fatalerror then - if "onconnect" == arg then - self.ondisconnect = nil -- dont call this when client isnt really connected - end - self:_close() - debug( "handshake failed because:", self.fatalerror ) - self.eventhandshake = nil - return -1 - end - event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster... - end - end - ) - debug "starting handshake..." - self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked - self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT ) - return true - end - function interface_mt:_destroy() -- close this interface + events and call last listener - debug( "closing client with id:", self.id ) - self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions - local _ - _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! - if self.type == "client" then - _ = self.eventwrite and self.eventwrite:close( ) - _ = self.eventhandshake and self.eventhandshake:close( ) - _ = self.eventstarthandshake and self.eventstarthandshake:close( ) - _ = self.eventconnect and self.eventconnect:close( ) - _ = self.eventsession and self.eventsession:close( ) - _ = self.eventwritetimeout and self.eventwritetimeout:close( ) - _ = self.eventreadtimeout and self.eventreadtimeout:close( ) - _ = self.ondisconnect and self:ondisconnect( self.fatalerror ) -- call ondisconnect listener (wont be the case if handshake failed on connect) - _ = self.conn and self.conn:close( ) -- close connection, must also be called outside of any socket registered events! - _ = self._server and self._server:counter(-1); - self.eventread, self.eventwrite = nil, nil - self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil - self.readcallback, self.writecallback = nil, nil - else - self.conn:close( ) - self.eventread, self.eventclose = nil, nil - self.interface, self.readcallback = nil, nil - end - interfacelist( "delete", self ) - return true - end - - function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events - self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting - return nointerface, noreading, nowriting - end - - function interface_mt:counter(c) - if c then - self._connections = self._connections + c - end - return self._connections - end - - -- Public methods - function interface_mt:write(data) - if self.nowriting then return nil, "locked" end - --vdebug( "try to send data to client, id/data:", self.id, data ) - data = tostring( data ) - local len = string_len( data ) - local total = len + self.writebufferlen - if total > cfg.MAX_SEND_LENGTH then -- check buffer length - local err = "send buffer exceeded" - debug( "error:", err ) -- to much, check your app - return nil, err - end - self.writebuffer = self.writebuffer .. data -- new buffer - self.writebufferlen = total - if not self.eventwrite then -- register new write event - --vdebug( "register new write event" ) - self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ) - end - return true - end - function interface_mt:close(now) - if self.nointerface then return nil, "locked"; end - debug( "try to close client connection with id:", self.id ) - if self.type == "client" then - self.fatalerror = "client to close" - if ( not self.eventwrite ) or now then -- try to close immediately - self:_lock( true, true, true ) - self:_close() - return true - else -- wait for incomplete write request - self:_lock( true, true, false ) - debug "closing delayed until writebuffer is empty" - return nil, "writebuffer not empty, waiting" - end - else - debug( "try to close server with id:", self.id, "args:", now ) - self.fatalerror = "server to close" - self:_lock( true ) - local count = 0 - for _, item in ipairs( interfacelist( ) ) do - if ( item.type ~= "server" ) and ( item._server == self ) then -- client/server match - if item:close( now ) then -- writebuffer was empty - count = count + 1 - end - end - end - local timeout = 0 -- dont wait for unfinished writebuffers of clients... - if not now then - timeout = cfg.WRITE_TIMEOUT -- ...or wait for it - end - self:_close( timeout ) -- add new event to remove the server interface - debug( "seconds remained until server is closed:", timeout ) - return count -- returns finished clients with empty writebuffer - end - end - - function interface_mt:server() - return self._server or self; - end - - function interface_mt:port() - return self._port - end - - function interface_mt:ip() - return self._ip - end - - function interface_mt:ssl() - return self._usingssl - end - - function interface_mt:type() - return self._type or "client" - end - - function interface_mt:connections() - return self._connections - end - - function interface_mt:address() - return self.addr - end - - function interface_mt:set_sslctx(sslctx) - self._sslctx = sslctx; - if sslctx then - self.starttls = nil; -- use starttls() of interface_mt - else - self.starttls = false; -- prevent starttls() - end - end - - function interface_mt:set_send(new_send) - -- No-op, we always use the underlying connection's send - end - - function interface_mt:starttls() - debug( "try to start ssl at client id:", self.id ) - local err - if not self._sslctx then -- no ssl available - err = "no ssl context available" - elseif self._usingssl then -- startssl was already called - err = "ssl already active" - end - if err then - debug( "error:", err ) - return nil, err - end - self._usingssl = true - self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event - self.startsslcallback = nil - self:_start_ssl(); - self.eventstarthandshake = nil - return -1 - end - if not self.eventwrite then - self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake - self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake - else -- wait until writebuffer is empty - self:_lock( true, true, false ) - debug "ssl session delayed until writebuffer is empty..." - end - return true - end - - -- Stub handlers - function interface_mt:onconnect() - end - function interface_mt:onincoming() - end - function interface_mt:ondisconnect() - end - function interface_mt:ontimeout() - end -end - --- End of client interface methods - -local handleclient; -do - local string_sub = string.sub -- caching table lookups - local string_len = string.len - local addevent = base.addevent - local coroutine_wrap = coroutine.wrap - local socket_gettime = socket.gettime - local coroutine_yield = coroutine.yield - function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface - --vdebug("creating client interfacce...") - local interface = { - type = "client"; - conn = client; - currenttime = socket_gettime( ); -- safe the origin - writebuffer = ""; -- writebuffer - writebufferlen = 0; -- length of writebuffer - send = client.send; -- caching table lookups - receive = client.receive; - onconnect = listener.onconnect; -- will be called when client disconnects - ondisconnect = listener.ondisconnect; -- will be called when client disconnects - onincoming = listener.onincoming; -- will be called when client sends data - ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs - eventread = false, eventwrite = false, eventclose = false, - eventhandshake = false, eventstarthandshake = false; -- event handler - eventconnect = false, eventsession = false; -- more event handler... - eventwritetimeout = false; -- even more event handler... - eventreadtimeout = false; - fatalerror = false; -- error message - writecallback = false; -- will be called on write events - readcallback = false; -- will be called on read events - nointerface = true; -- lock/unlock parameter of this interface - noreading = false, nowriting = false; -- locks of the read/writecallback - startsslcallback = false; -- starting handshake callback - position = false; -- position of client in interfacelist - - -- Properties - _ip = ip, _port = port, _server = server, _pattern = pattern, - _sslctx = sslctx; -- parameters - _usingssl = false; -- client is using ssl; - } - if not sslctx then - interface.starttls = false -- don't allow TLS - end - interface.id = tostring(interface):match("%x+$"); - interface.writecallback = function( event ) -- called on write events - --vdebug( "new client write event, id/ip/port:", interface, ip, port ) - if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event - --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror ) - interface.eventwrite = false - return -1 - end - if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect - interface.fatalerror = "timeout during writing" - debug( "writing failed:", interface.fatalerror ) - interface:_close() - interface.eventwrite = false - return -1 - else -- can write :) - if interface._usingssl then -- handle luasec - if interface.eventreadtimeout then -- we have to read first - local ret = interface.readcallback( ) -- call readcallback - --vdebug( "tried to read in writecallback, result:", ret ) - end - if interface.eventwritetimeout then -- luasec only - interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error - interface.eventwritetimeout = false - end - end - local succ, err, byte = interface.conn:send( interface.writebuffer, 1, interface.writebufferlen ) - --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte ) - if succ then -- writing succesful - interface.writebuffer = "" - interface.writebufferlen = 0 - if interface.fatalerror then - debug "closing client after writing" - interface:_close() -- close interface if needed - elseif interface.startsslcallback then -- start ssl connection if needed - debug "starting ssl handshake after writing" - interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 ) - elseif interface.eventreadtimeout then - return EV_WRITE, EV_TIMEOUT - end - interface.eventwrite = nil - return -1 - elseif byte then -- want write again - --vdebug( "writebuffer is not empty:", err ) - interface.writebuffer = string_sub( interface.writebuffer, byte + 1, interface.writebufferlen ) -- new buffer - interface.writebufferlen = interface.writebufferlen - byte - if "wantread" == err then -- happens only with luasec - local callback = function( ) - interface:_close() - interface.eventwritetimeout = nil - return evreturn, evtimeout - end - interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event - debug( "wantread during write attemp, reg it in readcallback but dont know what really happens next..." ) - -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent - return -1 - end - return EV_WRITE, cfg.WRITE_TIMEOUT - else -- connection was closed during writing or fatal error - interface.fatalerror = err or "fatal error" - debug( "connection failed in write event:", interface.fatalerror ) - interface:_close() - interface.eventwrite = nil - return -1 - end - end - end - - interface.readcallback = function( event ) -- called on read events - --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) - if interface.noreading or interface.fatalerror then -- leave this event - --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) ) - interface.eventread = nil - return -1 - end - if EV_TIMEOUT == event then -- took too long to get some data from client -> disconnect - interface.fatalerror = "timeout during receiving" - debug( "connection failed:", interface.fatalerror ) - interface:_close() - interface.eventread = nil - return -1 - else -- can read - if interface._usingssl then -- handle luasec - if interface.eventwritetimeout then -- ok, in the past writecallback was regged - local ret = interface.writecallback( ) -- call it - --vdebug( "tried to write in readcallback, result:", tostring(ret) ) - end - if interface.eventreadtimeout then - interface.eventreadtimeout:close( ) - interface.eventreadtimeout = nil - end - end - local buffer, err, part = interface.conn:receive( pattern ) -- receive buffer with "pattern" - --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) - buffer = buffer or part or "" - local len = string_len( buffer ) - if len > cfg.MAX_READ_LENGTH then -- check buffer length - interface.fatalerror = "receive buffer exceeded" - debug( "fatal error:", interface.fatalerror ) - interface:_close() - interface.eventread = nil - return -1 - end - interface.onincoming( interface, buffer, err ) -- send new data to listener - if err and ( err ~= "timeout" and err ~= "wantread" ) then - if "wantwrite" == err then -- need to read on write event - if not interface.eventwrite then -- register new write event if needed - interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) - end - interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, - function( ) - interface:_close() - end, cfg.READ_TIMEOUT - ) - debug( "wantwrite during read attemp, reg it in writecallback but dont know what really happens next..." ) - -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... - else -- connection was closed or fatal error - interface.fatalerror = err - debug( "connection failed in read event:", interface.fatalerror ) - interface:_close() - interface.eventread = nil - return -1 - end - end - return EV_READ, cfg.READ_TIMEOUT - end - end - - client:settimeout( 0 ) -- set non blocking - setmetatable(interface, interface_mt) - interfacelist( "add", interface ) -- add to interfacelist - return interface - end -end - -local handleserver -do - function handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- creates an server interface - debug "creating server interface..." - local interface = { - _connections = 0; - - conn = server; - onconnect = listener.onconnect; -- will be called when new client connected - eventread = false; -- read event handler - eventclose = false; -- close event handler - readcallback = false; -- read event callback - fatalerror = false; -- error message - nointerface = true; -- lock/unlock parameter - } - interface.id = tostring(interface):match("%x+$"); - interface.readcallback = function( event ) -- server handler, called on incoming connections - --vdebug( "server can accept, id/addr/port:", interface, addr, port ) - if interface.fatalerror then - --vdebug( "leaving this event because:", self.fatalerror ) - interface.eventread = nil - return -1 - end - local delay = cfg.ACCEPT_DELAY - if EV_TIMEOUT == event then - if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count - debug( "to many connections, seconds to wait for next accept:", delay ) - return EV_TIMEOUT, delay -- timeout... - else - return EV_READ -- accept again - end - end - --vdebug("max connection check ok, accepting...") - local client, err = server:accept() -- try to accept; TODO: check err - while client do - if interface._connections >= cfg.MAX_CONNECTIONS then - client:close( ) -- refuse connection - debug( "maximal connections reached, refuse client connection; accept delay:", delay ) - return EV_TIMEOUT, delay -- delay for next accept attemp - end - local ip, port = client:getpeername( ) - interface._connections = interface._connections + 1 -- increase connection count - local clientinterface = handleclient( client, ip, port, interface, pattern, listener, nil, sslctx ) - --vdebug( "client id:", clientinterface, "startssl:", startssl ) - if startssl then - clientinterface:_start_ssl( clientinterface.onconnect ) - else - clientinterface:_start_session( clientinterface.onconnect ) - end - debug( "accepted incoming client connection from:", ip, port ) - client, err = server:accept() -- try to accept again - end - return EV_READ - end - - server:settimeout( 0 ) - setmetatable(interface, interface_mt) - interfacelist( "add", interface ) - interface:_start_session() - return interface - end -end - -local addserver = ( function( ) - return function( addr, port, listener, pattern, sslcfg, startssl ) -- TODO: check arguments - --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslcfg or "nil", startssl or "nil") - local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket - if not server then - debug( "creating server socket failed because:", err ) - return nil, err - end - local sslctx - if sslcfg then - if not ssl then - debug "fatal error: luasec not found" - return nil, "luasec not found" - end - sslctx, err = ssl.newcontext( sslcfg ) - if err then - debug( "error while creating new ssl context for server socket:", err ) - return nil, err - end - end - local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler - debug( "new server created with id:", tostring(interface)) - return interface - end -end )( ) - -local addclient, wrapclient -do - function wrapclient( client, ip, port, listeners, pattern, sslctx, startssl ) - local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) - interface:_start_session() - return interface - --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface - end - - function addclient( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl ) - local client, err = socket.tcp() -- creating new socket - if not client then - debug( "cannot create socket:", err ) - return nil, err - end - client:settimeout( 0 ) -- set nonblocking - if localaddr then - local res, err = client:bind( localaddr, localport, -1 ) - if not res then - debug( "cannot bind client:", err ) - return nil, err - end - end - local sslctx - if sslcfg then -- handle ssl/new context - if not ssl then - debug "need luasec, but not available" - return nil, "luasec not found" - end - sslctx, err = ssl.newcontext( sslcfg ) - if err then - debug( "cannot create new ssl context:", err ) - return nil, err - end - end - local res, err = client:connect( addr, serverport ) -- connect - if res or ( err == "timeout" ) then - local ip, port = client:getsockname( ) - local server = function( ) - return nil, "this is a dummy server interface" - end - local interface = wrapclient( client, ip, serverport, listeners, pattern, sslctx, startssl ) - interface:_start_connection( startssl ) - debug( "new connection id:", interface.id ) - return interface, err - else - debug( "new connection failed:", err ) - return nil, err - end - end -end - - -local loop = function( ) -- starts the event loop - return base:loop( ) -end - -local newevent = ( function( ) - local add = base.addevent - return function( ... ) - return add( base, ... ) - end -end )( ) - -local closeallservers = function( arg ) - for _, item in ipairs( interfacelist( ) ) do - if item "type" == "server" then - item( "close", arg ) - end - end -end - -return { - - cfg = cfg, - base = base, - loop = loop, - event = event, - event_base = base, - addevent = newevent, - addserver = addserver, - addclient = addclient, - wrapclient = wrapclient, - closeallservers = closeallservers, - - __NAME = SCRIPT_NAME, - __DATE = LAST_MODIFIED, - __AUTHOR = SCRIPT_AUTHOR, - __VERSION = SCRIPT_VERSION, - -} +--[[ + + + server.lua based on lua/libevent by blastbeat + + notes: + -- when using luaevent, never register 2 or more EV_READ at one socket, same for EV_WRITE + -- you cant even register a new EV_READ/EV_WRITE callback inside another one + -- never call eventcallback:close( ) from inside eventcallback + -- to do some of the above, use timeout events or something what will called from outside + -- dont let garbagecollect eventcallbacks, as long they are running + -- when using luasec, there are 4 cases of timeout errors: wantread or wantwrite during reading or writing + +--]] + + +local SCRIPT_NAME = "server_event.lua" +local SCRIPT_VERSION = "0.05" +local SCRIPT_AUTHOR = "blastbeat" +local LAST_MODIFIED = "2009/11/20" + +local cfg = { + MAX_CONNECTIONS = 100000, -- max per server connections (use "ulimit -n" on *nix) + MAX_HANDSHAKE_ATTEMPS = 10, -- attemps to finish ssl handshake + HANDSHAKE_TIMEOUT = 1, -- timout in seconds per handshake attemp + MAX_READ_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes allowed to read from sockets + MAX_SEND_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes size of write buffer (for writing on sockets) + ACCEPT_DELAY = 10, -- seconds to wait until the next attemp of a full server to accept + READ_TIMEOUT = 60 * 30, -- timeout in seconds for read data from socket + WRITE_TIMEOUT = 30, -- timeout in seconds for write data on socket + CONNECT_TIMEOUT = 10, -- timeout in seconds for connection attemps + CLEAR_DELAY = 5, -- seconds to wait for clearing interface list (and calling ondisconnect listeners) + DEBUG = true, -- show debug messages +} + +local function use(x) return rawget(_G, x); end +local print = use "print" +local pcall = use "pcall" +local ipairs = use "ipairs" +local string = use "string" +local select = use "select" +local require = use "require" +local tostring = use "tostring" +local coroutine = use "coroutine" +local setmetatable = use "setmetatable" + +local ssl = use "ssl" +local socket = use "socket" + +local log = require ("util.logger").init("socket") + +local function debug(...) + return log("debug", ("%s "):rep(select('#', ...)), ...) +end +local vdebug = debug; + +local bitor = ( function( ) -- thx Rici Lake + local hasbit = function( x, p ) + return x % ( p + p ) >= p + end + return function( x, y ) + local p = 1 + local z = 0 + local limit = x > y and x or y + while p <= limit do + if hasbit( x, p ) or hasbit( y, p ) then + z = z + p + end + p = p + p + end + return z + end +end )( ) + +local event = require "luaevent.core" +local base = event.new( ) +local EV_READ = event.EV_READ +local EV_WRITE = event.EV_WRITE +local EV_TIMEOUT = event.EV_TIMEOUT + +local EV_READWRITE = bitor( EV_READ, EV_WRITE ) + +local interfacelist = ( function( ) -- holds the interfaces for sockets + local array = { } + local len = 0 + return function( method, arg ) + if "add" == method then + len = len + 1 + array[ len ] = arg + arg:_position( len ) + return len + elseif "delete" == method then + if len <= 0 then + return nil, "array is already empty" + end + local position = arg:_position() -- get position in array + if position ~= len then + local interface = array[ len ] -- get last interface + array[ position ] = interface -- copy it into free position + array[ len ] = nil -- free last position + interface:_position( position ) -- set new position in array + else -- free last position + array[ len ] = nil + end + len = len - 1 + return len + else + return array + end + end +end )( ) + +-- Client interface methods +local interface_mt +do + interface_mt = {}; interface_mt.__index = interface_mt; + + local addevent = base.addevent + local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield + local string_len = string.len + + -- Private methods + function interface_mt:_position(new_position) + self.position = new_position or self.position + return self.position; + end + function interface_mt:_close() -- regs event to start self:_destroy() + local callback = function( ) + self:_destroy(); + self.eventclose = nil + return -1 + end + self.eventclose = addevent( base, nil, EV_TIMEOUT, callback, 0 ) + return true + end + + function interface_mt:_start_connection(plainssl) -- should be called from addclient + local callback = function( event ) + if EV_TIMEOUT == event then -- timout during connection + self.fatalerror = "connection timeout" + self:ontimeout() -- call timeout listener + self:_close() + debug( "new connection failed. id:", self.id, "error:", self.fatalerror ) + else + if plainssl then -- start ssl session + self:_start_ssl( self.listener.onconnect ) + else -- normal connection + self:_start_session( self.listener.onconnect ) + end + debug( "new connection established. id:", self.id ) + end + self.eventconnect = nil + return -1 + end + self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT ) + return true + end + function interface_mt:_start_session(onconnect) -- new session, for example after startssl + if self.type == "client" then + local callback = function( ) + self:_lock( false, false, false ) + --vdebug( "start listening on client socket with id:", self.id ) + self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ) -- register callback + self:onconnect() + self.eventsession = nil + return -1 + end + self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 ) + else + self:_lock( false ) + --vdebug( "start listening on server socket with id:", self.id ) + self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback + end + return true + end + function interface_mt:_start_ssl(arg) -- old socket will be destroyed, therefore we have to close read/write events first + --vdebug( "starting ssl session with client id:", self.id ) + local _ + _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! + _ = self.eventwrite and self.eventwrite:close( ) + self.eventread, self.eventwrite = nil, nil + local err + self.conn, err = ssl.wrap( self.conn, self._sslctx ) + if err then + self.fatalerror = err + self.conn = nil -- cannot be used anymore + if "onconnect" == arg then + self.ondisconnect = nil -- dont call this when client isnt really connected + end + self:_close() + debug( "fatal error while ssl wrapping:", err ) + return false + end + self.conn:settimeout( 0 ) -- set non blocking + local handshakecallback = coroutine_wrap( + function( event ) + local _, err + local attempt = 0 + local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPS + while attempt < 1000 do -- no endless loop + attempt = attempt + 1 + debug( "ssl handshake of client with id:"..tostring(self).."attemp:"..attempt ) + if attempt > maxattempt then + self.fatalerror = "max handshake attemps exceeded" + elseif EV_TIMEOUT == event then + self.fatalerror = "timeout during handshake" + else + _, err = self.conn:dohandshake( ) + if not err then + self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed + self.send = self.conn.send -- caching table lookups with new client object + self.receive = self.conn.receive + local onsomething + if "onconnect" == arg then -- trigger listener + onsomething = self.onconnect + else + onsomething = self.onsslconnection + end + self:_start_session( onsomething ) + debug( "ssl handshake done" ) + self.eventhandshake = nil + return -1 + end + debug( "error during ssl handshake:", err ) + if err == "wantwrite" then + event = EV_WRITE + elseif err == "wantread" then + event = EV_READ + else + self.fatalerror = err + end + end + if self.fatalerror then + if "onconnect" == arg then + self.ondisconnect = nil -- dont call this when client isnt really connected + end + self:_close() + debug( "handshake failed because:", self.fatalerror ) + self.eventhandshake = nil + return -1 + end + event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster... + end + end + ) + debug "starting handshake..." + self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked + self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT ) + return true + end + function interface_mt:_destroy() -- close this interface + events and call last listener + debug( "closing client with id:", self.id ) + self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions + local _ + _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! + if self.type == "client" then + _ = self.eventwrite and self.eventwrite:close( ) + _ = self.eventhandshake and self.eventhandshake:close( ) + _ = self.eventstarthandshake and self.eventstarthandshake:close( ) + _ = self.eventconnect and self.eventconnect:close( ) + _ = self.eventsession and self.eventsession:close( ) + _ = self.eventwritetimeout and self.eventwritetimeout:close( ) + _ = self.eventreadtimeout and self.eventreadtimeout:close( ) + _ = self.ondisconnect and self:ondisconnect( self.fatalerror ) -- call ondisconnect listener (wont be the case if handshake failed on connect) + _ = self.conn and self.conn:close( ) -- close connection, must also be called outside of any socket registered events! + _ = self._server and self._server:counter(-1); + self.eventread, self.eventwrite = nil, nil + self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil + self.readcallback, self.writecallback = nil, nil + else + self.conn:close( ) + self.eventread, self.eventclose = nil, nil + self.interface, self.readcallback = nil, nil + end + interfacelist( "delete", self ) + return true + end + + function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events + self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting + return nointerface, noreading, nowriting + end + + function interface_mt:counter(c) + if c then + self._connections = self._connections + c + end + return self._connections + end + + -- Public methods + function interface_mt:write(data) + if self.nowriting then return nil, "locked" end + --vdebug( "try to send data to client, id/data:", self.id, data ) + data = tostring( data ) + local len = string_len( data ) + local total = len + self.writebufferlen + if total > cfg.MAX_SEND_LENGTH then -- check buffer length + local err = "send buffer exceeded" + debug( "error:", err ) -- to much, check your app + return nil, err + end + self.writebuffer = self.writebuffer .. data -- new buffer + self.writebufferlen = total + if not self.eventwrite then -- register new write event + --vdebug( "register new write event" ) + self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ) + end + return true + end + function interface_mt:close(now) + if self.nointerface then return nil, "locked"; end + debug( "try to close client connection with id:", self.id ) + if self.type == "client" then + self.fatalerror = "client to close" + if ( not self.eventwrite ) or now then -- try to close immediately + self:_lock( true, true, true ) + self:_close() + return true + else -- wait for incomplete write request + self:_lock( true, true, false ) + debug "closing delayed until writebuffer is empty" + return nil, "writebuffer not empty, waiting" + end + else + debug( "try to close server with id:", self.id, "args:", now ) + self.fatalerror = "server to close" + self:_lock( true ) + local count = 0 + for _, item in ipairs( interfacelist( ) ) do + if ( item.type ~= "server" ) and ( item._server == self ) then -- client/server match + if item:close( now ) then -- writebuffer was empty + count = count + 1 + end + end + end + local timeout = 0 -- dont wait for unfinished writebuffers of clients... + if not now then + timeout = cfg.WRITE_TIMEOUT -- ...or wait for it + end + self:_close( timeout ) -- add new event to remove the server interface + debug( "seconds remained until server is closed:", timeout ) + return count -- returns finished clients with empty writebuffer + end + end + + function interface_mt:server() + return self._server or self; + end + + function interface_mt:port() + return self._port + end + + function interface_mt:ip() + return self._ip + end + + function interface_mt:ssl() + return self._usingssl + end + + function interface_mt:type() + return self._type or "client" + end + + function interface_mt:connections() + return self._connections + end + + function interface_mt:address() + return self.addr + end + + function interface_mt:set_sslctx(sslctx) + self._sslctx = sslctx; + if sslctx then + self.starttls = nil; -- use starttls() of interface_mt + else + self.starttls = false; -- prevent starttls() + end + end + + function interface_mt:set_send(new_send) + -- No-op, we always use the underlying connection's send + end + + function interface_mt:starttls() + debug( "try to start ssl at client id:", self.id ) + local err + if not self._sslctx then -- no ssl available + err = "no ssl context available" + elseif self._usingssl then -- startssl was already called + err = "ssl already active" + end + if err then + debug( "error:", err ) + return nil, err + end + self._usingssl = true + self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event + self.startsslcallback = nil + self:_start_ssl(); + self.eventstarthandshake = nil + return -1 + end + if not self.eventwrite then + self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake + self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake + else -- wait until writebuffer is empty + self:_lock( true, true, false ) + debug "ssl session delayed until writebuffer is empty..." + end + return true + end + + -- Stub handlers + function interface_mt:onconnect() + end + function interface_mt:onincoming() + end + function interface_mt:ondisconnect() + end + function interface_mt:ontimeout() + end +end + +-- End of client interface methods + +local handleclient; +do + local string_sub = string.sub -- caching table lookups + local string_len = string.len + local addevent = base.addevent + local coroutine_wrap = coroutine.wrap + local socket_gettime = socket.gettime + local coroutine_yield = coroutine.yield + function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface + --vdebug("creating client interfacce...") + local interface = { + type = "client"; + conn = client; + currenttime = socket_gettime( ); -- safe the origin + writebuffer = ""; -- writebuffer + writebufferlen = 0; -- length of writebuffer + send = client.send; -- caching table lookups + receive = client.receive; + onconnect = listener.onconnect; -- will be called when client disconnects + ondisconnect = listener.ondisconnect; -- will be called when client disconnects + onincoming = listener.onincoming; -- will be called when client sends data + ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs + eventread = false, eventwrite = false, eventclose = false, + eventhandshake = false, eventstarthandshake = false; -- event handler + eventconnect = false, eventsession = false; -- more event handler... + eventwritetimeout = false; -- even more event handler... + eventreadtimeout = false; + fatalerror = false; -- error message + writecallback = false; -- will be called on write events + readcallback = false; -- will be called on read events + nointerface = true; -- lock/unlock parameter of this interface + noreading = false, nowriting = false; -- locks of the read/writecallback + startsslcallback = false; -- starting handshake callback + position = false; -- position of client in interfacelist + + -- Properties + _ip = ip, _port = port, _server = server, _pattern = pattern, + _sslctx = sslctx; -- parameters + _usingssl = false; -- client is using ssl; + } + if not sslctx then + interface.starttls = false -- don't allow TLS + end + interface.id = tostring(interface):match("%x+$"); + interface.writecallback = function( event ) -- called on write events + --vdebug( "new client write event, id/ip/port:", interface, ip, port ) + if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event + --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror ) + interface.eventwrite = false + return -1 + end + if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect + interface.fatalerror = "timeout during writing" + debug( "writing failed:", interface.fatalerror ) + interface:_close() + interface.eventwrite = false + return -1 + else -- can write :) + if interface._usingssl then -- handle luasec + if interface.eventreadtimeout then -- we have to read first + local ret = interface.readcallback( ) -- call readcallback + --vdebug( "tried to read in writecallback, result:", ret ) + end + if interface.eventwritetimeout then -- luasec only + interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error + interface.eventwritetimeout = false + end + end + local succ, err, byte = interface.conn:send( interface.writebuffer, 1, interface.writebufferlen ) + --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte ) + if succ then -- writing succesful + interface.writebuffer = "" + interface.writebufferlen = 0 + if interface.fatalerror then + debug "closing client after writing" + interface:_close() -- close interface if needed + elseif interface.startsslcallback then -- start ssl connection if needed + debug "starting ssl handshake after writing" + interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 ) + elseif interface.eventreadtimeout then + return EV_WRITE, EV_TIMEOUT + end + interface.eventwrite = nil + return -1 + elseif byte then -- want write again + --vdebug( "writebuffer is not empty:", err ) + interface.writebuffer = string_sub( interface.writebuffer, byte + 1, interface.writebufferlen ) -- new buffer + interface.writebufferlen = interface.writebufferlen - byte + if "wantread" == err then -- happens only with luasec + local callback = function( ) + interface:_close() + interface.eventwritetimeout = nil + return evreturn, evtimeout + end + interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event + debug( "wantread during write attemp, reg it in readcallback but dont know what really happens next..." ) + -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent + return -1 + end + return EV_WRITE, cfg.WRITE_TIMEOUT + else -- connection was closed during writing or fatal error + interface.fatalerror = err or "fatal error" + debug( "connection failed in write event:", interface.fatalerror ) + interface:_close() + interface.eventwrite = nil + return -1 + end + end + end + + interface.readcallback = function( event ) -- called on read events + --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) + if interface.noreading or interface.fatalerror then -- leave this event + --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) ) + interface.eventread = nil + return -1 + end + if EV_TIMEOUT == event then -- took too long to get some data from client -> disconnect + interface.fatalerror = "timeout during receiving" + debug( "connection failed:", interface.fatalerror ) + interface:_close() + interface.eventread = nil + return -1 + else -- can read + if interface._usingssl then -- handle luasec + if interface.eventwritetimeout then -- ok, in the past writecallback was regged + local ret = interface.writecallback( ) -- call it + --vdebug( "tried to write in readcallback, result:", tostring(ret) ) + end + if interface.eventreadtimeout then + interface.eventreadtimeout:close( ) + interface.eventreadtimeout = nil + end + end + local buffer, err, part = interface.conn:receive( pattern ) -- receive buffer with "pattern" + --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) + buffer = buffer or part or "" + local len = string_len( buffer ) + if len > cfg.MAX_READ_LENGTH then -- check buffer length + interface.fatalerror = "receive buffer exceeded" + debug( "fatal error:", interface.fatalerror ) + interface:_close() + interface.eventread = nil + return -1 + end + interface.onincoming( interface, buffer, err ) -- send new data to listener + if err and ( err ~= "timeout" and err ~= "wantread" ) then + if "wantwrite" == err then -- need to read on write event + if not interface.eventwrite then -- register new write event if needed + interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) + end + interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, + function( ) + interface:_close() + end, cfg.READ_TIMEOUT + ) + debug( "wantwrite during read attemp, reg it in writecallback but dont know what really happens next..." ) + -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... + else -- connection was closed or fatal error + interface.fatalerror = err + debug( "connection failed in read event:", interface.fatalerror ) + interface:_close() + interface.eventread = nil + return -1 + end + end + return EV_READ, cfg.READ_TIMEOUT + end + end + + client:settimeout( 0 ) -- set non blocking + setmetatable(interface, interface_mt) + interfacelist( "add", interface ) -- add to interfacelist + return interface + end +end + +local handleserver +do + function handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- creates an server interface + debug "creating server interface..." + local interface = { + _connections = 0; + + conn = server; + onconnect = listener.onconnect; -- will be called when new client connected + eventread = false; -- read event handler + eventclose = false; -- close event handler + readcallback = false; -- read event callback + fatalerror = false; -- error message + nointerface = true; -- lock/unlock parameter + } + interface.id = tostring(interface):match("%x+$"); + interface.readcallback = function( event ) -- server handler, called on incoming connections + --vdebug( "server can accept, id/addr/port:", interface, addr, port ) + if interface.fatalerror then + --vdebug( "leaving this event because:", self.fatalerror ) + interface.eventread = nil + return -1 + end + local delay = cfg.ACCEPT_DELAY + if EV_TIMEOUT == event then + if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count + debug( "to many connections, seconds to wait for next accept:", delay ) + return EV_TIMEOUT, delay -- timeout... + else + return EV_READ -- accept again + end + end + --vdebug("max connection check ok, accepting...") + local client, err = server:accept() -- try to accept; TODO: check err + while client do + if interface._connections >= cfg.MAX_CONNECTIONS then + client:close( ) -- refuse connection + debug( "maximal connections reached, refuse client connection; accept delay:", delay ) + return EV_TIMEOUT, delay -- delay for next accept attemp + end + local ip, port = client:getpeername( ) + interface._connections = interface._connections + 1 -- increase connection count + local clientinterface = handleclient( client, ip, port, interface, pattern, listener, nil, sslctx ) + --vdebug( "client id:", clientinterface, "startssl:", startssl ) + if startssl then + clientinterface:_start_ssl( clientinterface.onconnect ) + else + clientinterface:_start_session( clientinterface.onconnect ) + end + debug( "accepted incoming client connection from:", ip, port ) + client, err = server:accept() -- try to accept again + end + return EV_READ + end + + server:settimeout( 0 ) + setmetatable(interface, interface_mt) + interfacelist( "add", interface ) + interface:_start_session() + return interface + end +end + +local addserver = ( function( ) + return function( addr, port, listener, pattern, sslcfg, startssl ) -- TODO: check arguments + --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslcfg or "nil", startssl or "nil") + local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket + if not server then + debug( "creating server socket failed because:", err ) + return nil, err + end + local sslctx + if sslcfg then + if not ssl then + debug "fatal error: luasec not found" + return nil, "luasec not found" + end + sslctx, err = ssl.newcontext( sslcfg ) + if err then + debug( "error while creating new ssl context for server socket:", err ) + return nil, err + end + end + local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler + debug( "new server created with id:", tostring(interface)) + return interface + end +end )( ) + +local addclient, wrapclient +do + function wrapclient( client, ip, port, listeners, pattern, sslctx, startssl ) + local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) + interface:_start_session() + return interface + --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface + end + + function addclient( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl ) + local client, err = socket.tcp() -- creating new socket + if not client then + debug( "cannot create socket:", err ) + return nil, err + end + client:settimeout( 0 ) -- set nonblocking + if localaddr then + local res, err = client:bind( localaddr, localport, -1 ) + if not res then + debug( "cannot bind client:", err ) + return nil, err + end + end + local sslctx + if sslcfg then -- handle ssl/new context + if not ssl then + debug "need luasec, but not available" + return nil, "luasec not found" + end + sslctx, err = ssl.newcontext( sslcfg ) + if err then + debug( "cannot create new ssl context:", err ) + return nil, err + end + end + local res, err = client:connect( addr, serverport ) -- connect + if res or ( err == "timeout" ) then + local ip, port = client:getsockname( ) + local server = function( ) + return nil, "this is a dummy server interface" + end + local interface = wrapclient( client, ip, serverport, listeners, pattern, sslctx, startssl ) + interface:_start_connection( startssl ) + debug( "new connection id:", interface.id ) + return interface, err + else + debug( "new connection failed:", err ) + return nil, err + end + end +end + + +local loop = function( ) -- starts the event loop + return base:loop( ) +end + +local newevent = ( function( ) + local add = base.addevent + return function( ... ) + return add( base, ... ) + end +end )( ) + +local closeallservers = function( arg ) + for _, item in ipairs( interfacelist( ) ) do + if item "type" == "server" then + item( "close", arg ) + end + end +end + +return { + + cfg = cfg, + base = base, + loop = loop, + event = event, + event_base = base, + addevent = newevent, + addserver = addserver, + addclient = addclient, + wrapclient = wrapclient, + closeallservers = closeallservers, + + __NAME = SCRIPT_NAME, + __DATE = LAST_MODIFIED, + __AUTHOR = SCRIPT_AUTHOR, + __VERSION = SCRIPT_VERSION, + +}
--- a/net/server_select.lua Fri Dec 04 03:02:25 2009 +0000 +++ b/net/server_select.lua Fri Dec 04 03:07:17 2009 +0000 @@ -1,921 +1,921 @@ --- --- server.lua by blastbeat of the luadch project --- Re-used here under the MIT/X Consortium License --- --- Modifications (C) 2008-2009 Matthew Wild, Waqas Hussain --- - --- // wrapping luadch stuff // -- - -local use = function( what ) - return _G[ what ] -end -local clean = function( tbl ) - for i, k in pairs( tbl ) do - tbl[ i ] = nil - end -end - -local log, table_concat = require ("util.logger").init("socket"), table.concat; -local out_put = function (...) return log("debug", table_concat{...}); end -local out_error = function (...) return log("warn", table_concat{...}); end -local mem_free = collectgarbage - -----------------------------------// DECLARATION //-- - ---// constants //-- - -local STAT_UNIT = 1 -- byte - ---// lua functions //-- - -local type = use "type" -local pairs = use "pairs" -local ipairs = use "ipairs" -local tostring = use "tostring" -local collectgarbage = use "collectgarbage" - ---// lua libs //-- - -local os = use "os" -local table = use "table" -local string = use "string" -local coroutine = use "coroutine" - ---// lua lib methods //-- - -local os_time = os.time -local os_difftime = os.difftime -local table_concat = table.concat -local table_remove = table.remove -local string_len = string.len -local string_sub = string.sub -local coroutine_wrap = coroutine.wrap -local coroutine_yield = coroutine.yield - ---// extern libs //-- - -local luasec = select( 2, pcall( require, "ssl" ) ) -local luasocket = require "socket" - ---// extern lib methods //-- - -local ssl_wrap = ( luasec and luasec.wrap ) -local socket_bind = luasocket.bind -local socket_sleep = luasocket.sleep -local socket_select = luasocket.select -local ssl_newcontext = ( luasec and luasec.newcontext ) - ---// functions //-- - -local id -local loop -local stats -local idfalse -local addtimer -local closeall -local addserver -local getserver -local wrapserver -local getsettings -local closesocket -local removesocket -local removeserver -local changetimeout -local wrapconnection -local changesettings - ---// tables //-- - -local _server -local _readlist -local _timerlist -local _sendlist -local _socketlist -local _closelist -local _readtimes -local _writetimes - ---// simple data types //-- - -local _ -local _readlistlen -local _sendlistlen -local _timerlistlen - -local _sendtraffic -local _readtraffic - -local _selecttimeout -local _sleeptime - -local _starttime -local _currenttime - -local _maxsendlen -local _maxreadlen - -local _checkinterval -local _sendtimeout -local _readtimeout - -local _cleanqueue - -local _timer - -local _maxclientsperserver - -----------------------------------// DEFINITION //-- - -_server = { } -- key = port, value = table; list of listening servers -_readlist = { } -- array with sockets to read from -_sendlist = { } -- arrary with sockets to write to -_timerlist = { } -- array of timer functions -_socketlist = { } -- key = socket, value = wrapped socket (handlers) -_readtimes = { } -- key = handler, value = timestamp of last data reading -_writetimes = { } -- key = handler, value = timestamp of last data writing/sending -_closelist = { } -- handlers to close - -_readlistlen = 0 -- length of readlist -_sendlistlen = 0 -- length of sendlist -_timerlistlen = 0 -- lenght of timerlist - -_sendtraffic = 0 -- some stats -_readtraffic = 0 - -_selecttimeout = 1 -- timeout of socket.select -_sleeptime = 0 -- time to wait at the end of every loop - -_maxsendlen = 51000 * 1024 -- max len of send buffer -_maxreadlen = 25000 * 1024 -- max len of read buffer - -_checkinterval = 1200000 -- interval in secs to check idle clients -_sendtimeout = 60000 -- allowed send idle time in secs -_readtimeout = 6 * 60 * 60 -- allowed read idle time in secs - -_cleanqueue = false -- clean bufferqueue after using - -_maxclientsperserver = 1000 - -_maxsslhandshake = 30 -- max handshake round-trips -----------------------------------// PRIVATE //-- - -wrapserver = function( listeners, socket, ip, serverport, pattern, sslctx, maxconnections, startssl ) -- this function wraps a server - - maxconnections = maxconnections or _maxclientsperserver - - local connections = 0 - - local dispatch, disconnect = listeners.onincoming, listeners.ondisconnect - - local err - - local ssl = false - - if sslctx then - ssl = true - if not ssl_newcontext then - out_error "luasec not found" - ssl = false - end - if type( sslctx ) ~= "table" then - out_error "server.lua: wrong server sslctx" - ssl = false - end - local ctx; - ctx, err = ssl_newcontext( sslctx ) - if not ctx then - err = err or "wrong sslctx parameters" - local file; - file = err:match("^error loading (.-) %("); - if file then - if file == "private key" then - file = sslctx.key or "your private key"; - elseif file == "certificate" then - file = sslctx.certificate or "your certificate file"; - end - local reason = err:match("%((.+)%)$") or "some reason"; - if reason == "Permission denied" then - reason = "Check that the permissions allow Prosody to read this file."; - elseif reason == "No such file or directory" then - reason = "Check that the path is correct, and the file exists."; - elseif reason == "system lib" then - reason = "Previous error (see logs), or other system error."; - else - reason = "Reason: "..tostring(reason or "unknown"):lower(); - end - log("error", "SSL/TLS: Failed to load %s: %s", file, reason); - else - log("error", "SSL/TLS: Error initialising for port %d: %s", serverport, err ); - end - ssl = false - end - sslctx = ctx; - end - if not ssl then - sslctx = false; - if startssl then - log("error", "Failed to listen on port %d due to SSL/TLS to SSL/TLS initialisation errors (see logs)", serverport ) - return nil, "Cannot start ssl, see log for details" - end - end - - local accept = socket.accept - - --// public methods of the object //-- - - local handler = { } - - handler.shutdown = function( ) end - - handler.ssl = function( ) - return ssl - end - handler.sslctx = function( ) - return sslctx - end - handler.remove = function( ) - connections = connections - 1 - end - handler.close = function( ) - for _, handler in pairs( _socketlist ) do - if handler.serverport == serverport then - handler.disconnect( handler, "server closed" ) - handler:close( true ) - end - end - socket:close( ) - _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) - _readlistlen = removesocket( _readlist, socket, _readlistlen ) - _socketlist[ socket ] = nil - handler = nil - socket = nil - --mem_free( ) - out_put "server.lua: closed server handler and removed sockets from list" - end - handler.ip = function( ) - return ip - end - handler.serverport = function( ) - return serverport - end - handler.socket = function( ) - return socket - end - handler.readbuffer = function( ) - if connections > maxconnections then - out_put( "server.lua: refused new client connection: server full" ) - return false - end - local client, err = accept( socket ) -- try to accept - if client then - local ip, clientport = client:getpeername( ) - client:settimeout( 0 ) - local handler, client, err = wrapconnection( handler, listeners, client, ip, serverport, clientport, pattern, sslctx, startssl ) -- wrap new client socket - if err then -- error while wrapping ssl socket - return false - end - connections = connections + 1 - out_put( "server.lua: accepted new client connection from ", tostring(ip), ":", tostring(clientport), " to ", tostring(serverport)) - return dispatch( handler ) - elseif err then -- maybe timeout or something else - out_put( "server.lua: error with new client connection: ", tostring(err) ) - return false - end - end - return handler -end - -wrapconnection = function( server, listeners, socket, ip, serverport, clientport, pattern, sslctx, startssl ) -- this function wraps a client to a handler object - - socket:settimeout( 0 ) - - --// local import of socket methods //-- - - local send - local receive - local shutdown - - --// private closures of the object //-- - - local ssl - - local dispatch = listeners.onincoming - local status = listeners.status - local disconnect = listeners.ondisconnect - - local bufferqueue = { } -- buffer array - local bufferqueuelen = 0 -- end of buffer array - - local toclose - local fatalerror - local needtls - - local bufferlen = 0 - - local noread = false - local nosend = false - - local sendtraffic, readtraffic = 0, 0 - - local maxsendlen = _maxsendlen - local maxreadlen = _maxreadlen - - --// public methods of the object //-- - - local handler = bufferqueue -- saves a table ^_^ - - handler.dispatch = function( ) - return dispatch - end - handler.disconnect = function( ) - return disconnect - end - handler.setlistener = function( self, listeners ) - dispatch = listeners.onincoming - disconnect = listeners.ondisconnect - end - handler.getstats = function( ) - return readtraffic, sendtraffic - end - handler.ssl = function( ) - return ssl - end - handler.sslctx = function ( ) - return sslctx - end - handler.send = function( _, data, i, j ) - return send( socket, data, i, j ) - end - handler.receive = function( pattern, prefix ) - return receive( socket, pattern, prefix ) - end - handler.shutdown = function( pattern ) - return shutdown( socket, pattern ) - end - handler.close = function( self, forced ) - if not handler then return true; end - _readlistlen = removesocket( _readlist, socket, _readlistlen ) - _readtimes[ handler ] = nil - if bufferqueuelen ~= 0 then - if not ( forced or fatalerror ) then - handler.sendbuffer( ) - if bufferqueuelen ~= 0 then -- try again... - if handler then - handler.write = nil -- ... but no further writing allowed - end - toclose = true - return false - end - else - send( socket, table_concat( bufferqueue, "", 1, bufferqueuelen ), 1, bufferlen ) -- forced send - end - end - if socket then - _ = shutdown and shutdown( socket ) - socket:close( ) - _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) - _socketlist[ socket ] = nil - socket = nil - else - out_put "server.lua: socket already closed" - end - if handler then - _writetimes[ handler ] = nil - _closelist[ handler ] = nil - handler = nil - end - if server then - server.remove( ) - end - out_put "server.lua: closed client handler and removed socket from list" - return true - end - handler.ip = function( ) - return ip - end - handler.serverport = function( ) - return serverport - end - handler.clientport = function( ) - return clientport - end - local write = function( self, data ) - bufferlen = bufferlen + string_len( data ) - if bufferlen > maxsendlen then - _closelist[ handler ] = "send buffer exceeded" -- cannot close the client at the moment, have to wait to the end of the cycle - handler.write = idfalse -- dont write anymore - return false - elseif socket and not _sendlist[ socket ] then - _sendlistlen = addsocket(_sendlist, socket, _sendlistlen) - end - bufferqueuelen = bufferqueuelen + 1 - bufferqueue[ bufferqueuelen ] = data - if handler then - _writetimes[ handler ] = _writetimes[ handler ] or _currenttime - end - return true - end - handler.write = write - handler.bufferqueue = function( self ) - return bufferqueue - end - handler.socket = function( self ) - return socket - end - handler.pattern = function( self, new ) - pattern = new or pattern - return pattern - end - handler.set_send = function ( self, newsend ) - send = newsend or send - return send - end - handler.bufferlen = function( self, readlen, sendlen ) - maxsendlen = sendlen or maxsendlen - maxreadlen = readlen or maxreadlen - return bufferlen, maxreadlen, maxsendlen - end - handler.lock_read = function (self, switch) - if switch == true then - local tmp = _readlistlen - _readlistlen = removesocket( _readlist, socket, _readlistlen ) - _readtimes[ handler ] = nil - if _readlistlen ~= tmp then - noread = true - end - elseif switch == false then - if noread then - noread = false - _readlistlen = addsocket(_readlist, socket, _readlistlen) - _readtimes[ handler ] = _currenttime - end - end - return noread - end - handler.lock = function( self, switch ) - handler.lock_read (switch) - if switch == true then - handler.write = idfalse - local tmp = _sendlistlen - _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) - _writetimes[ handler ] = nil - if _sendlistlen ~= tmp then - nosend = true - end - elseif switch == false then - handler.write = write - if nosend then - nosend = false - write( "" ) - end - end - return noread, nosend - end - local _readbuffer = function( ) -- this function reads data - local buffer, err, part = receive( socket, pattern ) -- receive buffer with "pattern" - if not err or (err == "wantread" or err == "timeout") or string_len(part) > 0 then -- received something - local buffer = buffer or part or "" - local len = string_len( buffer ) - if len > maxreadlen then - disconnect( handler, "receive buffer exceeded" ) - handler.close( true ) - return false - end - local count = len * STAT_UNIT - readtraffic = readtraffic + count - _readtraffic = _readtraffic + count - _readtimes[ handler ] = _currenttime - --out_put( "server.lua: read data '", buffer:gsub("[^%w%p ]", "."), "', error: ", err ) - return dispatch( handler, buffer, err ) - else -- connections was closed or fatal error - out_put( "server.lua: client ", tostring(ip), ":", tostring(clientport), " read error: ", tostring(err) ) - fatalerror = true - disconnect( handler, err ) - _ = handler and handler.close( ) - return false - end - end - local _sendbuffer = function( ) -- this function sends data - local succ, err, byte, buffer, count; - local count; - if socket then - buffer = table_concat( bufferqueue, "", 1, bufferqueuelen ) - succ, err, byte = send( socket, buffer, 1, bufferlen ) - count = ( succ or byte or 0 ) * STAT_UNIT - sendtraffic = sendtraffic + count - _sendtraffic = _sendtraffic + count - _ = _cleanqueue and clean( bufferqueue ) - --out_put( "server.lua: sended '", buffer, "', bytes: ", tostring(succ), ", error: ", tostring(err), ", part: ", tostring(byte), ", to: ", tostring(ip), ":", tostring(clientport) ) - else - succ, err, count = false, "closed", 0; - end - if succ then -- sending succesful - bufferqueuelen = 0 - bufferlen = 0 - _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) -- delete socket from writelist - _ = needtls and handler:starttls(true) - _writetimes[ handler ] = nil - _ = toclose and handler.close( ) - return true - elseif byte and ( err == "timeout" or err == "wantwrite" ) then -- want write - buffer = string_sub( buffer, byte + 1, bufferlen ) -- new buffer - bufferqueue[ 1 ] = buffer -- insert new buffer in queue - bufferqueuelen = 1 - bufferlen = bufferlen - byte - _writetimes[ handler ] = _currenttime - return true - else -- connection was closed during sending or fatal error - out_put( "server.lua: client ", tostring(ip), ":", tostring(clientport), " write error: ", tostring(err) ) - fatalerror = true - disconnect( handler, err ) - _ = handler and handler.close( ) - return false - end - end - - -- Set the sslctx - local handshake; - function handler.set_sslctx(self, new_sslctx) - ssl = true - sslctx = new_sslctx; - local wrote - local read - handshake = coroutine_wrap( function( client ) -- create handshake coroutine - local err - for i = 1, _maxsslhandshake do - _sendlistlen = ( wrote and removesocket( _sendlist, client, _sendlistlen ) ) or _sendlistlen - _readlistlen = ( read and removesocket( _readlist, client, _readlistlen ) ) or _readlistlen - read, wrote = nil, nil - _, err = client:dohandshake( ) - if not err then - out_put( "server.lua: ssl handshake done" ) - handler.readbuffer = _readbuffer -- when handshake is done, replace the handshake function with regular functions - handler.sendbuffer = _sendbuffer - _ = status and status( handler, "ssl-handshake-complete" ) - _readlistlen = addsocket(_readlist, client, _readlistlen) - return true - else - out_put( "server.lua: error during ssl handshake: ", tostring(err) ) - if err == "wantwrite" and not wrote then - _sendlistlen = addsocket(_sendlist, client, _sendlistlen) - wrote = true - elseif err == "wantread" and not read then - _readlistlen = addsocket(_readlist, client, _readlistlen) - read = true - else - break; - end - --coroutine_yield( handler, nil, err ) -- handshake not finished - coroutine_yield( ) - end - end - disconnect( handler, "ssl handshake failed" ) - _ = handler and handler:close( true ) -- forced disconnect - return false -- handshake failed - end - ) - end - if sslctx then -- ssl? - handler:set_sslctx(sslctx); - if startssl then -- ssl now? - --out_put("server.lua: ", "starting ssl handshake") - local err - socket, err = ssl_wrap( socket, sslctx ) -- wrap socket - if err then - out_put( "server.lua: ssl error: ", tostring(err) ) - --mem_free( ) - return nil, nil, err -- fatal error - end - socket:settimeout( 0 ) - handler.readbuffer = handshake - handler.sendbuffer = handshake - handshake( socket ) -- do handshake - if not socket then - return nil, nil, "ssl handshake failed"; - end - else - -- We're not automatically doing SSL, so we're not secure (yet) - ssl = false - handler.starttls = function( self, now ) - if not now then - --out_put "server.lua: we need to do tls, but delaying until later" - needtls = true - return - end - --out_put( "server.lua: attempting to start tls on " .. tostring( socket ) ) - local oldsocket, err = socket - socket, err = ssl_wrap( socket, sslctx ) -- wrap socket - --out_put( "server.lua: sslwrapped socket is " .. tostring( socket ) ) - if err then - out_put( "server.lua: error while starting tls on client: ", tostring(err) ) - return nil, err -- fatal error - end - - socket:settimeout( 0 ) - - -- add the new socket to our system - - send = socket.send - receive = socket.receive - shutdown = id - - _socketlist[ socket ] = handler - _readlistlen = addsocket(_readlist, socket, _readlistlen) - - -- remove traces of the old socket - - _readlistlen = removesocket( _readlist, oldsocket, _readlistlen ) - _sendlistlen = removesocket( _sendlist, oldsocket, _sendlistlen ) - _socketlist[ oldsocket ] = nil - - handler.starttls = nil - needtls = nil - - -- Secure now - ssl = true - - handler.readbuffer = handshake - handler.sendbuffer = handshake - handshake( socket ) -- do handshake - end - handler.readbuffer = _readbuffer - handler.sendbuffer = _sendbuffer - end - else -- normal connection - ssl = false - handler.readbuffer = _readbuffer - handler.sendbuffer = _sendbuffer - end - - send = socket.send - receive = socket.receive - shutdown = ( ssl and id ) or socket.shutdown - - _socketlist[ socket ] = handler - _readlistlen = addsocket(_readlist, socket, _readlistlen) - - return handler, socket -end - -id = function( ) -end - -idfalse = function( ) - return false -end - -addsocket = function( list, socket, len ) - if not list[ socket ] then - len = len + 1 - list[ len ] = socket - list[ socket ] = len - end - return len; -end - -removesocket = function( list, socket, len ) -- this function removes sockets from a list ( copied from copas ) - local pos = list[ socket ] - if pos then - list[ socket ] = nil - local last = list[ len ] - list[ len ] = nil - if last ~= socket then - list[ last ] = pos - list[ pos ] = last - end - return len - 1 - end - return len -end - -closesocket = function( socket ) - _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) - _readlistlen = removesocket( _readlist, socket, _readlistlen ) - _socketlist[ socket ] = nil - socket:close( ) - --mem_free( ) -end - -----------------------------------// PUBLIC //-- - -addserver = function( addr, port, listeners, pattern, sslctx, startssl ) -- this function provides a way for other scripts to reg a server - local err - --out_put("server.lua: autossl on ", port, " is ", startssl) - if type( listeners ) ~= "table" then - err = "invalid listener table" - end - if not type( port ) == "number" or not ( port >= 0 and port <= 65535 ) then - err = "invalid port" - elseif _server[ port ] then - err = "listeners on port '" .. port .. "' already exist" - elseif sslctx and not luasec then - err = "luasec not found" - end - if err then - out_error( "server.lua, port ", port, ": ", err ) - return nil, err - end - addr = addr or "*" - local server, err = socket_bind( addr, port ) - if err then - out_error( "server.lua, port ", port, ": ", err ) - return nil, err - end - local handler, err = wrapserver( listeners, server, addr, port, pattern, sslctx, _maxclientsperserver, startssl ) -- wrap new server socket - if not handler then - server:close( ) - return nil, err - end - server:settimeout( 0 ) - _readlistlen = addsocket(_readlist, server, _readlistlen) - _server[ port ] = handler - _socketlist[ server ] = handler - out_put( "server.lua: new server listener on '", addr, ":", port, "'" ) - return handler -end - -getserver = function ( port ) - return _server[ port ]; -end - -removeserver = function( port ) - local handler = _server[ port ] - if not handler then - return nil, "no server found on port '" .. tostring( port ) .. "'" - end - handler:close( ) - _server[ port ] = nil - return true -end - -closeall = function( ) - for _, handler in pairs( _socketlist ) do - handler:close( ) - _socketlist[ _ ] = nil - end - _readlistlen = 0 - _sendlistlen = 0 - _timerlistlen = 0 - _server = { } - _readlist = { } - _sendlist = { } - _timerlist = { } - _socketlist = { } - --mem_free( ) -end - -getsettings = function( ) - return _selecttimeout, _sleeptime, _maxsendlen, _maxreadlen, _checkinterval, _sendtimeout, _readtimeout, _cleanqueue, _maxclientsperserver, _maxsslhandshake -end - -changesettings = function( new ) - if type( new ) ~= "table" then - return nil, "invalid settings table" - end - _selecttimeout = tonumber( new.timeout ) or _selecttimeout - _sleeptime = tonumber( new.sleeptime ) or _sleeptime - _maxsendlen = tonumber( new.maxsendlen ) or _maxsendlen - _maxreadlen = tonumber( new.maxreadlen ) or _maxreadlen - _checkinterval = tonumber( new.checkinterval ) or _checkinterval - _sendtimeout = tonumber( new.sendtimeout ) or _sendtimeout - _readtimeout = tonumber( new.readtimeout ) or _readtimeout - _cleanqueue = new.cleanqueue - _maxclientsperserver = new._maxclientsperserver or _maxclientsperserver - _maxsslhandshake = new._maxsslhandshake or _maxsslhandshake - return true -end - -addtimer = function( listener ) - if type( listener ) ~= "function" then - return nil, "invalid listener function" - end - _timerlistlen = _timerlistlen + 1 - _timerlist[ _timerlistlen ] = listener - return true -end - -stats = function( ) - return _readtraffic, _sendtraffic, _readlistlen, _sendlistlen, _timerlistlen -end - -local dontstop = true; -- thinking about tomorrow, ... - -setquitting = function (quit) - dontstop = not quit; - return; -end - -loop = function( ) -- this is the main loop of the program - while dontstop do - local read, write, err = socket_select( _readlist, _sendlist, _selecttimeout ) - for i, socket in ipairs( write ) do -- send data waiting in writequeues - local handler = _socketlist[ socket ] - if handler then - handler.sendbuffer( ) - else - closesocket( socket ) - out_put "server.lua: found no handler and closed socket (writelist)" -- this should not happen - end - end - for i, socket in ipairs( read ) do -- receive data - local handler = _socketlist[ socket ] - if handler then - handler.readbuffer( ) - else - closesocket( socket ) - out_put "server.lua: found no handler and closed socket (readlist)" -- this can happen - end - end - for handler, err in pairs( _closelist ) do - handler.disconnect( )( handler, err ) - handler:close( true ) -- forced disconnect - end - clean( _closelist ) - _currenttime = os_time( ) - if os_difftime( _currenttime - _timer ) >= 1 then - for i = 1, _timerlistlen do - _timerlist[ i ]( _currenttime ) -- fire timers - end - _timer = _currenttime - end - socket_sleep( _sleeptime ) -- wait some time - --collectgarbage( ) - end - return "quitting" -end - ---// EXPERIMENTAL //-- - -local wrapclient = function( socket, ip, serverport, listeners, pattern, sslctx, startssl ) - local handler = wrapconnection( nil, listeners, socket, ip, serverport, "clientport", pattern, sslctx, startssl ) - _socketlist[ socket ] = handler - _sendlistlen = addsocket(_sendlist, socket, _sendlistlen) - return handler, socket -end - -local addclient = function( address, port, listeners, pattern, sslctx, startssl ) - local client, err = luasocket.tcp( ) - if err then - return nil, err - end - client:settimeout( 0 ) - _, err = client:connect( address, port ) - if err then -- try again - local handler = wrapclient( client, address, port, listeners ) - else - wrapconnection( nil, listeners, client, address, port, "clientport", pattern, sslctx, startssl ) - end -end - ---// EXPERIMENTAL //-- - -----------------------------------// BEGIN //-- - -use "setmetatable" ( _socketlist, { __mode = "k" } ) -use "setmetatable" ( _readtimes, { __mode = "k" } ) -use "setmetatable" ( _writetimes, { __mode = "k" } ) - -_timer = os_time( ) -_starttime = os_time( ) - -addtimer( function( ) - local difftime = os_difftime( _currenttime - _starttime ) - if difftime > _checkinterval then - _starttime = _currenttime - for handler, timestamp in pairs( _writetimes ) do - if os_difftime( _currenttime - timestamp ) > _sendtimeout then - --_writetimes[ handler ] = nil - handler.disconnect( )( handler, "send timeout" ) - handler:close( true ) -- forced disconnect - end - end - for handler, timestamp in pairs( _readtimes ) do - if os_difftime( _currenttime - timestamp ) > _readtimeout then - --_readtimes[ handler ] = nil - handler.disconnect( )( handler, "read timeout" ) - handler:close( ) -- forced disconnect? - end - end - end - end -) - -----------------------------------// PUBLIC INTERFACE //-- - -return { - - addclient = addclient, - wrapclient = wrapclient, - - loop = loop, - stats = stats, - closeall = closeall, - addtimer = addtimer, - addserver = addserver, - getserver = getserver, - getsettings = getsettings, - setquitting = setquitting, - removeserver = removeserver, - changesettings = changesettings, -} +-- +-- server.lua by blastbeat of the luadch project +-- Re-used here under the MIT/X Consortium License +-- +-- Modifications (C) 2008-2009 Matthew Wild, Waqas Hussain +-- + +-- // wrapping luadch stuff // -- + +local use = function( what ) + return _G[ what ] +end +local clean = function( tbl ) + for i, k in pairs( tbl ) do + tbl[ i ] = nil + end +end + +local log, table_concat = require ("util.logger").init("socket"), table.concat; +local out_put = function (...) return log("debug", table_concat{...}); end +local out_error = function (...) return log("warn", table_concat{...}); end +local mem_free = collectgarbage + +----------------------------------// DECLARATION //-- + +--// constants //-- + +local STAT_UNIT = 1 -- byte + +--// lua functions //-- + +local type = use "type" +local pairs = use "pairs" +local ipairs = use "ipairs" +local tostring = use "tostring" +local collectgarbage = use "collectgarbage" + +--// lua libs //-- + +local os = use "os" +local table = use "table" +local string = use "string" +local coroutine = use "coroutine" + +--// lua lib methods //-- + +local os_time = os.time +local os_difftime = os.difftime +local table_concat = table.concat +local table_remove = table.remove +local string_len = string.len +local string_sub = string.sub +local coroutine_wrap = coroutine.wrap +local coroutine_yield = coroutine.yield + +--// extern libs //-- + +local luasec = select( 2, pcall( require, "ssl" ) ) +local luasocket = require "socket" + +--// extern lib methods //-- + +local ssl_wrap = ( luasec and luasec.wrap ) +local socket_bind = luasocket.bind +local socket_sleep = luasocket.sleep +local socket_select = luasocket.select +local ssl_newcontext = ( luasec and luasec.newcontext ) + +--// functions //-- + +local id +local loop +local stats +local idfalse +local addtimer +local closeall +local addserver +local getserver +local wrapserver +local getsettings +local closesocket +local removesocket +local removeserver +local changetimeout +local wrapconnection +local changesettings + +--// tables //-- + +local _server +local _readlist +local _timerlist +local _sendlist +local _socketlist +local _closelist +local _readtimes +local _writetimes + +--// simple data types //-- + +local _ +local _readlistlen +local _sendlistlen +local _timerlistlen + +local _sendtraffic +local _readtraffic + +local _selecttimeout +local _sleeptime + +local _starttime +local _currenttime + +local _maxsendlen +local _maxreadlen + +local _checkinterval +local _sendtimeout +local _readtimeout + +local _cleanqueue + +local _timer + +local _maxclientsperserver + +----------------------------------// DEFINITION //-- + +_server = { } -- key = port, value = table; list of listening servers +_readlist = { } -- array with sockets to read from +_sendlist = { } -- arrary with sockets to write to +_timerlist = { } -- array of timer functions +_socketlist = { } -- key = socket, value = wrapped socket (handlers) +_readtimes = { } -- key = handler, value = timestamp of last data reading +_writetimes = { } -- key = handler, value = timestamp of last data writing/sending +_closelist = { } -- handlers to close + +_readlistlen = 0 -- length of readlist +_sendlistlen = 0 -- length of sendlist +_timerlistlen = 0 -- lenght of timerlist + +_sendtraffic = 0 -- some stats +_readtraffic = 0 + +_selecttimeout = 1 -- timeout of socket.select +_sleeptime = 0 -- time to wait at the end of every loop + +_maxsendlen = 51000 * 1024 -- max len of send buffer +_maxreadlen = 25000 * 1024 -- max len of read buffer + +_checkinterval = 1200000 -- interval in secs to check idle clients +_sendtimeout = 60000 -- allowed send idle time in secs +_readtimeout = 6 * 60 * 60 -- allowed read idle time in secs + +_cleanqueue = false -- clean bufferqueue after using + +_maxclientsperserver = 1000 + +_maxsslhandshake = 30 -- max handshake round-trips +----------------------------------// PRIVATE //-- + +wrapserver = function( listeners, socket, ip, serverport, pattern, sslctx, maxconnections, startssl ) -- this function wraps a server + + maxconnections = maxconnections or _maxclientsperserver + + local connections = 0 + + local dispatch, disconnect = listeners.onincoming, listeners.ondisconnect + + local err + + local ssl = false + + if sslctx then + ssl = true + if not ssl_newcontext then + out_error "luasec not found" + ssl = false + end + if type( sslctx ) ~= "table" then + out_error "server.lua: wrong server sslctx" + ssl = false + end + local ctx; + ctx, err = ssl_newcontext( sslctx ) + if not ctx then + err = err or "wrong sslctx parameters" + local file; + file = err:match("^error loading (.-) %("); + if file then + if file == "private key" then + file = sslctx.key or "your private key"; + elseif file == "certificate" then + file = sslctx.certificate or "your certificate file"; + end + local reason = err:match("%((.+)%)$") or "some reason"; + if reason == "Permission denied" then + reason = "Check that the permissions allow Prosody to read this file."; + elseif reason == "No such file or directory" then + reason = "Check that the path is correct, and the file exists."; + elseif reason == "system lib" then + reason = "Previous error (see logs), or other system error."; + else + reason = "Reason: "..tostring(reason or "unknown"):lower(); + end + log("error", "SSL/TLS: Failed to load %s: %s", file, reason); + else + log("error", "SSL/TLS: Error initialising for port %d: %s", serverport, err ); + end + ssl = false + end + sslctx = ctx; + end + if not ssl then + sslctx = false; + if startssl then + log("error", "Failed to listen on port %d due to SSL/TLS to SSL/TLS initialisation errors (see logs)", serverport ) + return nil, "Cannot start ssl, see log for details" + end + end + + local accept = socket.accept + + --// public methods of the object //-- + + local handler = { } + + handler.shutdown = function( ) end + + handler.ssl = function( ) + return ssl + end + handler.sslctx = function( ) + return sslctx + end + handler.remove = function( ) + connections = connections - 1 + end + handler.close = function( ) + for _, handler in pairs( _socketlist ) do + if handler.serverport == serverport then + handler.disconnect( handler, "server closed" ) + handler:close( true ) + end + end + socket:close( ) + _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) + _readlistlen = removesocket( _readlist, socket, _readlistlen ) + _socketlist[ socket ] = nil + handler = nil + socket = nil + --mem_free( ) + out_put "server.lua: closed server handler and removed sockets from list" + end + handler.ip = function( ) + return ip + end + handler.serverport = function( ) + return serverport + end + handler.socket = function( ) + return socket + end + handler.readbuffer = function( ) + if connections > maxconnections then + out_put( "server.lua: refused new client connection: server full" ) + return false + end + local client, err = accept( socket ) -- try to accept + if client then + local ip, clientport = client:getpeername( ) + client:settimeout( 0 ) + local handler, client, err = wrapconnection( handler, listeners, client, ip, serverport, clientport, pattern, sslctx, startssl ) -- wrap new client socket + if err then -- error while wrapping ssl socket + return false + end + connections = connections + 1 + out_put( "server.lua: accepted new client connection from ", tostring(ip), ":", tostring(clientport), " to ", tostring(serverport)) + return dispatch( handler ) + elseif err then -- maybe timeout or something else + out_put( "server.lua: error with new client connection: ", tostring(err) ) + return false + end + end + return handler +end + +wrapconnection = function( server, listeners, socket, ip, serverport, clientport, pattern, sslctx, startssl ) -- this function wraps a client to a handler object + + socket:settimeout( 0 ) + + --// local import of socket methods //-- + + local send + local receive + local shutdown + + --// private closures of the object //-- + + local ssl + + local dispatch = listeners.onincoming + local status = listeners.status + local disconnect = listeners.ondisconnect + + local bufferqueue = { } -- buffer array + local bufferqueuelen = 0 -- end of buffer array + + local toclose + local fatalerror + local needtls + + local bufferlen = 0 + + local noread = false + local nosend = false + + local sendtraffic, readtraffic = 0, 0 + + local maxsendlen = _maxsendlen + local maxreadlen = _maxreadlen + + --// public methods of the object //-- + + local handler = bufferqueue -- saves a table ^_^ + + handler.dispatch = function( ) + return dispatch + end + handler.disconnect = function( ) + return disconnect + end + handler.setlistener = function( self, listeners ) + dispatch = listeners.onincoming + disconnect = listeners.ondisconnect + end + handler.getstats = function( ) + return readtraffic, sendtraffic + end + handler.ssl = function( ) + return ssl + end + handler.sslctx = function ( ) + return sslctx + end + handler.send = function( _, data, i, j ) + return send( socket, data, i, j ) + end + handler.receive = function( pattern, prefix ) + return receive( socket, pattern, prefix ) + end + handler.shutdown = function( pattern ) + return shutdown( socket, pattern ) + end + handler.close = function( self, forced ) + if not handler then return true; end + _readlistlen = removesocket( _readlist, socket, _readlistlen ) + _readtimes[ handler ] = nil + if bufferqueuelen ~= 0 then + if not ( forced or fatalerror ) then + handler.sendbuffer( ) + if bufferqueuelen ~= 0 then -- try again... + if handler then + handler.write = nil -- ... but no further writing allowed + end + toclose = true + return false + end + else + send( socket, table_concat( bufferqueue, "", 1, bufferqueuelen ), 1, bufferlen ) -- forced send + end + end + if socket then + _ = shutdown and shutdown( socket ) + socket:close( ) + _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) + _socketlist[ socket ] = nil + socket = nil + else + out_put "server.lua: socket already closed" + end + if handler then + _writetimes[ handler ] = nil + _closelist[ handler ] = nil + handler = nil + end + if server then + server.remove( ) + end + out_put "server.lua: closed client handler and removed socket from list" + return true + end + handler.ip = function( ) + return ip + end + handler.serverport = function( ) + return serverport + end + handler.clientport = function( ) + return clientport + end + local write = function( self, data ) + bufferlen = bufferlen + string_len( data ) + if bufferlen > maxsendlen then + _closelist[ handler ] = "send buffer exceeded" -- cannot close the client at the moment, have to wait to the end of the cycle + handler.write = idfalse -- dont write anymore + return false + elseif socket and not _sendlist[ socket ] then + _sendlistlen = addsocket(_sendlist, socket, _sendlistlen) + end + bufferqueuelen = bufferqueuelen + 1 + bufferqueue[ bufferqueuelen ] = data + if handler then + _writetimes[ handler ] = _writetimes[ handler ] or _currenttime + end + return true + end + handler.write = write + handler.bufferqueue = function( self ) + return bufferqueue + end + handler.socket = function( self ) + return socket + end + handler.pattern = function( self, new ) + pattern = new or pattern + return pattern + end + handler.set_send = function ( self, newsend ) + send = newsend or send + return send + end + handler.bufferlen = function( self, readlen, sendlen ) + maxsendlen = sendlen or maxsendlen + maxreadlen = readlen or maxreadlen + return bufferlen, maxreadlen, maxsendlen + end + handler.lock_read = function (self, switch) + if switch == true then + local tmp = _readlistlen + _readlistlen = removesocket( _readlist, socket, _readlistlen ) + _readtimes[ handler ] = nil + if _readlistlen ~= tmp then + noread = true + end + elseif switch == false then + if noread then + noread = false + _readlistlen = addsocket(_readlist, socket, _readlistlen) + _readtimes[ handler ] = _currenttime + end + end + return noread + end + handler.lock = function( self, switch ) + handler.lock_read (switch) + if switch == true then + handler.write = idfalse + local tmp = _sendlistlen + _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) + _writetimes[ handler ] = nil + if _sendlistlen ~= tmp then + nosend = true + end + elseif switch == false then + handler.write = write + if nosend then + nosend = false + write( "" ) + end + end + return noread, nosend + end + local _readbuffer = function( ) -- this function reads data + local buffer, err, part = receive( socket, pattern ) -- receive buffer with "pattern" + if not err or (err == "wantread" or err == "timeout") or string_len(part) > 0 then -- received something + local buffer = buffer or part or "" + local len = string_len( buffer ) + if len > maxreadlen then + disconnect( handler, "receive buffer exceeded" ) + handler.close( true ) + return false + end + local count = len * STAT_UNIT + readtraffic = readtraffic + count + _readtraffic = _readtraffic + count + _readtimes[ handler ] = _currenttime + --out_put( "server.lua: read data '", buffer:gsub("[^%w%p ]", "."), "', error: ", err ) + return dispatch( handler, buffer, err ) + else -- connections was closed or fatal error + out_put( "server.lua: client ", tostring(ip), ":", tostring(clientport), " read error: ", tostring(err) ) + fatalerror = true + disconnect( handler, err ) + _ = handler and handler.close( ) + return false + end + end + local _sendbuffer = function( ) -- this function sends data + local succ, err, byte, buffer, count; + local count; + if socket then + buffer = table_concat( bufferqueue, "", 1, bufferqueuelen ) + succ, err, byte = send( socket, buffer, 1, bufferlen ) + count = ( succ or byte or 0 ) * STAT_UNIT + sendtraffic = sendtraffic + count + _sendtraffic = _sendtraffic + count + _ = _cleanqueue and clean( bufferqueue ) + --out_put( "server.lua: sended '", buffer, "', bytes: ", tostring(succ), ", error: ", tostring(err), ", part: ", tostring(byte), ", to: ", tostring(ip), ":", tostring(clientport) ) + else + succ, err, count = false, "closed", 0; + end + if succ then -- sending succesful + bufferqueuelen = 0 + bufferlen = 0 + _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) -- delete socket from writelist + _ = needtls and handler:starttls(true) + _writetimes[ handler ] = nil + _ = toclose and handler.close( ) + return true + elseif byte and ( err == "timeout" or err == "wantwrite" ) then -- want write + buffer = string_sub( buffer, byte + 1, bufferlen ) -- new buffer + bufferqueue[ 1 ] = buffer -- insert new buffer in queue + bufferqueuelen = 1 + bufferlen = bufferlen - byte + _writetimes[ handler ] = _currenttime + return true + else -- connection was closed during sending or fatal error + out_put( "server.lua: client ", tostring(ip), ":", tostring(clientport), " write error: ", tostring(err) ) + fatalerror = true + disconnect( handler, err ) + _ = handler and handler.close( ) + return false + end + end + + -- Set the sslctx + local handshake; + function handler.set_sslctx(self, new_sslctx) + ssl = true + sslctx = new_sslctx; + local wrote + local read + handshake = coroutine_wrap( function( client ) -- create handshake coroutine + local err + for i = 1, _maxsslhandshake do + _sendlistlen = ( wrote and removesocket( _sendlist, client, _sendlistlen ) ) or _sendlistlen + _readlistlen = ( read and removesocket( _readlist, client, _readlistlen ) ) or _readlistlen + read, wrote = nil, nil + _, err = client:dohandshake( ) + if not err then + out_put( "server.lua: ssl handshake done" ) + handler.readbuffer = _readbuffer -- when handshake is done, replace the handshake function with regular functions + handler.sendbuffer = _sendbuffer + _ = status and status( handler, "ssl-handshake-complete" ) + _readlistlen = addsocket(_readlist, client, _readlistlen) + return true + else + out_put( "server.lua: error during ssl handshake: ", tostring(err) ) + if err == "wantwrite" and not wrote then + _sendlistlen = addsocket(_sendlist, client, _sendlistlen) + wrote = true + elseif err == "wantread" and not read then + _readlistlen = addsocket(_readlist, client, _readlistlen) + read = true + else + break; + end + --coroutine_yield( handler, nil, err ) -- handshake not finished + coroutine_yield( ) + end + end + disconnect( handler, "ssl handshake failed" ) + _ = handler and handler:close( true ) -- forced disconnect + return false -- handshake failed + end + ) + end + if sslctx then -- ssl? + handler:set_sslctx(sslctx); + if startssl then -- ssl now? + --out_put("server.lua: ", "starting ssl handshake") + local err + socket, err = ssl_wrap( socket, sslctx ) -- wrap socket + if err then + out_put( "server.lua: ssl error: ", tostring(err) ) + --mem_free( ) + return nil, nil, err -- fatal error + end + socket:settimeout( 0 ) + handler.readbuffer = handshake + handler.sendbuffer = handshake + handshake( socket ) -- do handshake + if not socket then + return nil, nil, "ssl handshake failed"; + end + else + -- We're not automatically doing SSL, so we're not secure (yet) + ssl = false + handler.starttls = function( self, now ) + if not now then + --out_put "server.lua: we need to do tls, but delaying until later" + needtls = true + return + end + --out_put( "server.lua: attempting to start tls on " .. tostring( socket ) ) + local oldsocket, err = socket + socket, err = ssl_wrap( socket, sslctx ) -- wrap socket + --out_put( "server.lua: sslwrapped socket is " .. tostring( socket ) ) + if err then + out_put( "server.lua: error while starting tls on client: ", tostring(err) ) + return nil, err -- fatal error + end + + socket:settimeout( 0 ) + + -- add the new socket to our system + + send = socket.send + receive = socket.receive + shutdown = id + + _socketlist[ socket ] = handler + _readlistlen = addsocket(_readlist, socket, _readlistlen) + + -- remove traces of the old socket + + _readlistlen = removesocket( _readlist, oldsocket, _readlistlen ) + _sendlistlen = removesocket( _sendlist, oldsocket, _sendlistlen ) + _socketlist[ oldsocket ] = nil + + handler.starttls = nil + needtls = nil + + -- Secure now + ssl = true + + handler.readbuffer = handshake + handler.sendbuffer = handshake + handshake( socket ) -- do handshake + end + handler.readbuffer = _readbuffer + handler.sendbuffer = _sendbuffer + end + else -- normal connection + ssl = false + handler.readbuffer = _readbuffer + handler.sendbuffer = _sendbuffer + end + + send = socket.send + receive = socket.receive + shutdown = ( ssl and id ) or socket.shutdown + + _socketlist[ socket ] = handler + _readlistlen = addsocket(_readlist, socket, _readlistlen) + + return handler, socket +end + +id = function( ) +end + +idfalse = function( ) + return false +end + +addsocket = function( list, socket, len ) + if not list[ socket ] then + len = len + 1 + list[ len ] = socket + list[ socket ] = len + end + return len; +end + +removesocket = function( list, socket, len ) -- this function removes sockets from a list ( copied from copas ) + local pos = list[ socket ] + if pos then + list[ socket ] = nil + local last = list[ len ] + list[ len ] = nil + if last ~= socket then + list[ last ] = pos + list[ pos ] = last + end + return len - 1 + end + return len +end + +closesocket = function( socket ) + _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) + _readlistlen = removesocket( _readlist, socket, _readlistlen ) + _socketlist[ socket ] = nil + socket:close( ) + --mem_free( ) +end + +----------------------------------// PUBLIC //-- + +addserver = function( addr, port, listeners, pattern, sslctx, startssl ) -- this function provides a way for other scripts to reg a server + local err + --out_put("server.lua: autossl on ", port, " is ", startssl) + if type( listeners ) ~= "table" then + err = "invalid listener table" + end + if not type( port ) == "number" or not ( port >= 0 and port <= 65535 ) then + err = "invalid port" + elseif _server[ port ] then + err = "listeners on port '" .. port .. "' already exist" + elseif sslctx and not luasec then + err = "luasec not found" + end + if err then + out_error( "server.lua, port ", port, ": ", err ) + return nil, err + end + addr = addr or "*" + local server, err = socket_bind( addr, port ) + if err then + out_error( "server.lua, port ", port, ": ", err ) + return nil, err + end + local handler, err = wrapserver( listeners, server, addr, port, pattern, sslctx, _maxclientsperserver, startssl ) -- wrap new server socket + if not handler then + server:close( ) + return nil, err + end + server:settimeout( 0 ) + _readlistlen = addsocket(_readlist, server, _readlistlen) + _server[ port ] = handler + _socketlist[ server ] = handler + out_put( "server.lua: new server listener on '", addr, ":", port, "'" ) + return handler +end + +getserver = function ( port ) + return _server[ port ]; +end + +removeserver = function( port ) + local handler = _server[ port ] + if not handler then + return nil, "no server found on port '" .. tostring( port ) .. "'" + end + handler:close( ) + _server[ port ] = nil + return true +end + +closeall = function( ) + for _, handler in pairs( _socketlist ) do + handler:close( ) + _socketlist[ _ ] = nil + end + _readlistlen = 0 + _sendlistlen = 0 + _timerlistlen = 0 + _server = { } + _readlist = { } + _sendlist = { } + _timerlist = { } + _socketlist = { } + --mem_free( ) +end + +getsettings = function( ) + return _selecttimeout, _sleeptime, _maxsendlen, _maxreadlen, _checkinterval, _sendtimeout, _readtimeout, _cleanqueue, _maxclientsperserver, _maxsslhandshake +end + +changesettings = function( new ) + if type( new ) ~= "table" then + return nil, "invalid settings table" + end + _selecttimeout = tonumber( new.timeout ) or _selecttimeout + _sleeptime = tonumber( new.sleeptime ) or _sleeptime + _maxsendlen = tonumber( new.maxsendlen ) or _maxsendlen + _maxreadlen = tonumber( new.maxreadlen ) or _maxreadlen + _checkinterval = tonumber( new.checkinterval ) or _checkinterval + _sendtimeout = tonumber( new.sendtimeout ) or _sendtimeout + _readtimeout = tonumber( new.readtimeout ) or _readtimeout + _cleanqueue = new.cleanqueue + _maxclientsperserver = new._maxclientsperserver or _maxclientsperserver + _maxsslhandshake = new._maxsslhandshake or _maxsslhandshake + return true +end + +addtimer = function( listener ) + if type( listener ) ~= "function" then + return nil, "invalid listener function" + end + _timerlistlen = _timerlistlen + 1 + _timerlist[ _timerlistlen ] = listener + return true +end + +stats = function( ) + return _readtraffic, _sendtraffic, _readlistlen, _sendlistlen, _timerlistlen +end + +local dontstop = true; -- thinking about tomorrow, ... + +setquitting = function (quit) + dontstop = not quit; + return; +end + +loop = function( ) -- this is the main loop of the program + while dontstop do + local read, write, err = socket_select( _readlist, _sendlist, _selecttimeout ) + for i, socket in ipairs( write ) do -- send data waiting in writequeues + local handler = _socketlist[ socket ] + if handler then + handler.sendbuffer( ) + else + closesocket( socket ) + out_put "server.lua: found no handler and closed socket (writelist)" -- this should not happen + end + end + for i, socket in ipairs( read ) do -- receive data + local handler = _socketlist[ socket ] + if handler then + handler.readbuffer( ) + else + closesocket( socket ) + out_put "server.lua: found no handler and closed socket (readlist)" -- this can happen + end + end + for handler, err in pairs( _closelist ) do + handler.disconnect( )( handler, err ) + handler:close( true ) -- forced disconnect + end + clean( _closelist ) + _currenttime = os_time( ) + if os_difftime( _currenttime - _timer ) >= 1 then + for i = 1, _timerlistlen do + _timerlist[ i ]( _currenttime ) -- fire timers + end + _timer = _currenttime + end + socket_sleep( _sleeptime ) -- wait some time + --collectgarbage( ) + end + return "quitting" +end + +--// EXPERIMENTAL //-- + +local wrapclient = function( socket, ip, serverport, listeners, pattern, sslctx, startssl ) + local handler = wrapconnection( nil, listeners, socket, ip, serverport, "clientport", pattern, sslctx, startssl ) + _socketlist[ socket ] = handler + _sendlistlen = addsocket(_sendlist, socket, _sendlistlen) + return handler, socket +end + +local addclient = function( address, port, listeners, pattern, sslctx, startssl ) + local client, err = luasocket.tcp( ) + if err then + return nil, err + end + client:settimeout( 0 ) + _, err = client:connect( address, port ) + if err then -- try again + local handler = wrapclient( client, address, port, listeners ) + else + wrapconnection( nil, listeners, client, address, port, "clientport", pattern, sslctx, startssl ) + end +end + +--// EXPERIMENTAL //-- + +----------------------------------// BEGIN //-- + +use "setmetatable" ( _socketlist, { __mode = "k" } ) +use "setmetatable" ( _readtimes, { __mode = "k" } ) +use "setmetatable" ( _writetimes, { __mode = "k" } ) + +_timer = os_time( ) +_starttime = os_time( ) + +addtimer( function( ) + local difftime = os_difftime( _currenttime - _starttime ) + if difftime > _checkinterval then + _starttime = _currenttime + for handler, timestamp in pairs( _writetimes ) do + if os_difftime( _currenttime - timestamp ) > _sendtimeout then + --_writetimes[ handler ] = nil + handler.disconnect( )( handler, "send timeout" ) + handler:close( true ) -- forced disconnect + end + end + for handler, timestamp in pairs( _readtimes ) do + if os_difftime( _currenttime - timestamp ) > _readtimeout then + --_readtimes[ handler ] = nil + handler.disconnect( )( handler, "read timeout" ) + handler:close( ) -- forced disconnect? + end + end + end + end +) + +----------------------------------// PUBLIC INTERFACE //-- + +return { + + addclient = addclient, + wrapclient = wrapclient, + + loop = loop, + stats = stats, + closeall = closeall, + addtimer = addtimer, + addserver = addserver, + getserver = getserver, + getsettings = getsettings, + setquitting = setquitting, + removeserver = removeserver, + changesettings = changesettings, +}