Software /
code /
prosody
File
net/server_select.lua @ 3597:8090880f0e18
hostmanager, componentmanager: hostmanager now handles component initialization at server start, not componentmanager.
author | Waqas Hussain <waqas20@gmail.com> |
---|---|
date | Wed, 10 Nov 2010 19:57:35 +0500 |
parent | 3543:90b21508ac27 |
child | 3682:d12141cbbaa0 |
line wrap: on
line source
-- -- server.lua by blastbeat of the luadch project -- Re-used here under the MIT/X Consortium License -- -- Modifications (C) 2008-2010 Matthew Wild, Waqas Hussain -- -- // wrapping luadch stuff // -- local use = function( what ) return _G[ what ] end local clean = function( tbl ) for i, k in pairs( tbl ) do tbl[ i ] = nil end end local log, table_concat = require ("util.logger").init("socket"), table.concat; local out_put = function (...) return log("debug", table_concat{...}); end local out_error = function (...) return log("warn", table_concat{...}); end local mem_free = collectgarbage ----------------------------------// DECLARATION //-- --// constants //-- local STAT_UNIT = 1 -- byte --// lua functions //-- local type = use "type" local pairs = use "pairs" local ipairs = use "ipairs" local tostring = use "tostring" local collectgarbage = use "collectgarbage" --// lua libs //-- local os = use "os" local table = use "table" local string = use "string" local coroutine = use "coroutine" --// lua lib methods //-- local os_time = os.time local os_difftime = os.difftime local table_concat = table.concat local table_remove = table.remove local string_len = string.len local string_sub = string.sub local coroutine_wrap = coroutine.wrap local coroutine_yield = coroutine.yield --// extern libs //-- local luasec = use "ssl" local luasocket = use "socket" or require "socket" --// extern lib methods //-- local ssl_wrap = ( luasec and luasec.wrap ) local socket_bind = luasocket.bind local socket_sleep = luasocket.sleep local socket_select = luasocket.select local ssl_newcontext = ( luasec and luasec.newcontext ) --// functions //-- local id local loop local stats local idfalse local addtimer local closeall local addserver local getserver local wrapserver local getsettings local closesocket local removesocket local removeserver local changetimeout local wrapconnection local changesettings --// tables //-- local _server local _readlist local _timerlist local _sendlist local _socketlist local _closelist local _readtimes local _writetimes --// simple data types //-- local _ local _readlistlen local _sendlistlen local _timerlistlen local _sendtraffic local _readtraffic local _selecttimeout local _sleeptime local _starttime local _currenttime local _maxsendlen local _maxreadlen local _checkinterval local _sendtimeout local _readtimeout local _cleanqueue local _timer local _maxclientsperserver ----------------------------------// DEFINITION //-- _server = { } -- key = port, value = table; list of listening servers _readlist = { } -- array with sockets to read from _sendlist = { } -- arrary with sockets to write to _timerlist = { } -- array of timer functions _socketlist = { } -- key = socket, value = wrapped socket (handlers) _readtimes = { } -- key = handler, value = timestamp of last data reading _writetimes = { } -- key = handler, value = timestamp of last data writing/sending _closelist = { } -- handlers to close _readlistlen = 0 -- length of readlist _sendlistlen = 0 -- length of sendlist _timerlistlen = 0 -- lenght of timerlist _sendtraffic = 0 -- some stats _readtraffic = 0 _selecttimeout = 1 -- timeout of socket.select _sleeptime = 0 -- time to wait at the end of every loop _maxsendlen = 51000 * 1024 -- max len of send buffer _maxreadlen = 25000 * 1024 -- max len of read buffer _checkinterval = 1200000 -- interval in secs to check idle clients _sendtimeout = 60000 -- allowed send idle time in secs _readtimeout = 6 * 60 * 60 -- allowed read idle time in secs _cleanqueue = false -- clean bufferqueue after using _maxclientsperserver = 1000 _maxsslhandshake = 30 -- max handshake round-trips ----------------------------------// PRIVATE //-- wrapserver = function( listeners, socket, ip, serverport, pattern, sslctx, maxconnections ) -- this function wraps a server maxconnections = maxconnections or _maxclientsperserver local connections = 0 local dispatch, disconnect = listeners.onconnect or listeners.onincoming, listeners.ondisconnect local accept = socket.accept --// public methods of the object //-- local handler = { } handler.shutdown = function( ) end handler.ssl = function( ) return sslctx ~= nil end handler.sslctx = function( ) return sslctx end handler.remove = function( ) connections = connections - 1 end handler.close = function( ) for _, handler in pairs( _socketlist ) do if handler.serverport == serverport then handler.disconnect( handler, "server closed" ) handler:close( true ) end end socket:close( ) _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) _readlistlen = removesocket( _readlist, socket, _readlistlen ) _socketlist[ socket ] = nil handler = nil socket = nil --mem_free( ) out_put "server.lua: closed server handler and removed sockets from list" end handler.ip = function( ) return ip end handler.serverport = function( ) return serverport end handler.socket = function( ) return socket end handler.readbuffer = function( ) if connections > maxconnections then out_put( "server.lua: refused new client connection: server full" ) return false end local client, err = accept( socket ) -- try to accept if client then local ip, clientport = client:getpeername( ) client:settimeout( 0 ) local handler, client, err = wrapconnection( handler, listeners, client, ip, serverport, clientport, pattern, sslctx ) -- wrap new client socket if err then -- error while wrapping ssl socket return false end connections = connections + 1 out_put( "server.lua: accepted new client connection from ", tostring(ip), ":", tostring(clientport), " to ", tostring(serverport)) return dispatch( handler ) elseif err then -- maybe timeout or something else out_put( "server.lua: error with new client connection: ", tostring(err) ) return false end end return handler end wrapconnection = function( server, listeners, socket, ip, serverport, clientport, pattern, sslctx ) -- this function wraps a client to a handler object socket:settimeout( 0 ) --// local import of socket methods //-- local send local receive local shutdown --// private closures of the object //-- local ssl local dispatch = listeners.onincoming local status = listeners.onstatus local disconnect = listeners.ondisconnect local drain = listeners.ondrain local bufferqueue = { } -- buffer array local bufferqueuelen = 0 -- end of buffer array local toclose local fatalerror local needtls local bufferlen = 0 local noread = false local nosend = false local sendtraffic, readtraffic = 0, 0 local maxsendlen = _maxsendlen local maxreadlen = _maxreadlen --// public methods of the object //-- local handler = bufferqueue -- saves a table ^_^ handler.dispatch = function( ) return dispatch end handler.disconnect = function( ) return disconnect end handler.setlistener = function( self, listeners ) dispatch = listeners.onincoming disconnect = listeners.ondisconnect status = listeners.onstatus drain = listeners.ondrain end handler.getstats = function( ) return readtraffic, sendtraffic end handler.ssl = function( ) return ssl end handler.sslctx = function ( ) return sslctx end handler.send = function( _, data, i, j ) return send( socket, data, i, j ) end handler.receive = function( pattern, prefix ) return receive( socket, pattern, prefix ) end handler.shutdown = function( pattern ) return shutdown( socket, pattern ) end handler.setoption = function (self, option, value) if socket.setoption then return socket:setoption(option, value); end return false, "setoption not implemented"; end handler.close = function( self, forced ) if not handler then return true; end _readlistlen = removesocket( _readlist, socket, _readlistlen ) _readtimes[ handler ] = nil if bufferqueuelen ~= 0 then if not ( forced or fatalerror ) then handler.sendbuffer( ) if bufferqueuelen ~= 0 then -- try again... if handler then handler.write = nil -- ... but no further writing allowed end toclose = true return false end else send( socket, table_concat( bufferqueue, "", 1, bufferqueuelen ), 1, bufferlen ) -- forced send end end if socket then _ = shutdown and shutdown( socket ) socket:close( ) _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) _socketlist[ socket ] = nil socket = nil else out_put "server.lua: socket already closed" end if handler then _writetimes[ handler ] = nil _closelist[ handler ] = nil handler = nil end if server then server.remove( ) end out_put "server.lua: closed client handler and removed socket from list" return true end handler.ip = function( ) return ip end handler.serverport = function( ) return serverport end handler.clientport = function( ) return clientport end local write = function( self, data ) bufferlen = bufferlen + string_len( data ) if bufferlen > maxsendlen then _closelist[ handler ] = "send buffer exceeded" -- cannot close the client at the moment, have to wait to the end of the cycle handler.write = idfalse -- dont write anymore return false elseif socket and not _sendlist[ socket ] then _sendlistlen = addsocket(_sendlist, socket, _sendlistlen) end bufferqueuelen = bufferqueuelen + 1 bufferqueue[ bufferqueuelen ] = data if handler then _writetimes[ handler ] = _writetimes[ handler ] or _currenttime end return true end handler.write = write handler.bufferqueue = function( self ) return bufferqueue end handler.socket = function( self ) return socket end handler.set_mode = function( self, new ) pattern = new or pattern return pattern end handler.set_send = function ( self, newsend ) send = newsend or send return send end handler.bufferlen = function( self, readlen, sendlen ) maxsendlen = sendlen or maxsendlen maxreadlen = readlen or maxreadlen return bufferlen, maxreadlen, maxsendlen end --TODO: Deprecate handler.lock_read = function (self, switch) if switch == true then local tmp = _readlistlen _readlistlen = removesocket( _readlist, socket, _readlistlen ) _readtimes[ handler ] = nil if _readlistlen ~= tmp then noread = true end elseif switch == false then if noread then noread = false _readlistlen = addsocket(_readlist, socket, _readlistlen) _readtimes[ handler ] = _currenttime end end return noread end handler.pause = function (self) return self:lock_read(true); end handler.resume = function (self) return self:lock_read(false); end handler.lock = function( self, switch ) handler.lock_read (switch) if switch == true then handler.write = idfalse local tmp = _sendlistlen _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) _writetimes[ handler ] = nil if _sendlistlen ~= tmp then nosend = true end elseif switch == false then handler.write = write if nosend then nosend = false write( "" ) end end return noread, nosend end local _readbuffer = function( ) -- this function reads data local buffer, err, part = receive( socket, pattern ) -- receive buffer with "pattern" if not err or (err == "wantread" or err == "timeout") then -- received something local buffer = buffer or part or "" local len = string_len( buffer ) if len > maxreadlen then disconnect( handler, "receive buffer exceeded" ) handler:close( true ) return false end local count = len * STAT_UNIT readtraffic = readtraffic + count _readtraffic = _readtraffic + count _readtimes[ handler ] = _currenttime --out_put( "server.lua: read data '", buffer:gsub("[^%w%p ]", "."), "', error: ", err ) return dispatch( handler, buffer, err ) else -- connections was closed or fatal error out_put( "server.lua: client ", tostring(ip), ":", tostring(clientport), " read error: ", tostring(err) ) fatalerror = true disconnect( handler, err ) _ = handler and handler:close( ) return false end end local _sendbuffer = function( ) -- this function sends data local succ, err, byte, buffer, count; local count; if socket then buffer = table_concat( bufferqueue, "", 1, bufferqueuelen ) succ, err, byte = send( socket, buffer, 1, bufferlen ) count = ( succ or byte or 0 ) * STAT_UNIT sendtraffic = sendtraffic + count _sendtraffic = _sendtraffic + count _ = _cleanqueue and clean( bufferqueue ) --out_put( "server.lua: sended '", buffer, "', bytes: ", tostring(succ), ", error: ", tostring(err), ", part: ", tostring(byte), ", to: ", tostring(ip), ":", tostring(clientport) ) else succ, err, count = false, "closed", 0; end if succ then -- sending succesful bufferqueuelen = 0 bufferlen = 0 _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) -- delete socket from writelist _writetimes[ handler ] = nil if drain then drain(handler) end _ = needtls and handler:starttls(nil) _ = toclose and handler:close( ) return true elseif byte and ( err == "timeout" or err == "wantwrite" ) then -- want write buffer = string_sub( buffer, byte + 1, bufferlen ) -- new buffer bufferqueue[ 1 ] = buffer -- insert new buffer in queue bufferqueuelen = 1 bufferlen = bufferlen - byte _writetimes[ handler ] = _currenttime return true else -- connection was closed during sending or fatal error out_put( "server.lua: client ", tostring(ip), ":", tostring(clientport), " write error: ", tostring(err) ) fatalerror = true disconnect( handler, err ) _ = handler and handler:close( ) return false end end -- Set the sslctx local handshake; function handler.set_sslctx(self, new_sslctx) ssl = true sslctx = new_sslctx; local wrote local read handshake = coroutine_wrap( function( client ) -- create handshake coroutine local err for i = 1, _maxsslhandshake do _sendlistlen = ( wrote and removesocket( _sendlist, client, _sendlistlen ) ) or _sendlistlen _readlistlen = ( read and removesocket( _readlist, client, _readlistlen ) ) or _readlistlen read, wrote = nil, nil _, err = client:dohandshake( ) if not err then out_put( "server.lua: ssl handshake done" ) handler.readbuffer = _readbuffer -- when handshake is done, replace the handshake function with regular functions handler.sendbuffer = _sendbuffer _ = status and status( handler, "ssl-handshake-complete" ) _readlistlen = addsocket(_readlist, client, _readlistlen) return true else out_put( "server.lua: error during ssl handshake: ", tostring(err) ) if err == "wantwrite" and not wrote then _sendlistlen = addsocket(_sendlist, client, _sendlistlen) wrote = true elseif err == "wantread" and not read then _readlistlen = addsocket(_readlist, client, _readlistlen) read = true else break; end --coroutine_yield( handler, nil, err ) -- handshake not finished coroutine_yield( ) end end disconnect( handler, "ssl handshake failed" ) _ = handler and handler:close( true ) -- forced disconnect return false -- handshake failed end ) end if luasec then if sslctx then -- ssl? handler:set_sslctx(sslctx); out_put("server.lua: ", "starting ssl handshake") local err socket, err = ssl_wrap( socket, sslctx ) -- wrap socket if err then out_put( "server.lua: ssl error: ", tostring(err) ) --mem_free( ) return nil, nil, err -- fatal error end socket:settimeout( 0 ) handler.readbuffer = handshake handler.sendbuffer = handshake handshake( socket ) -- do handshake if not socket then return nil, nil, "ssl handshake failed"; end else local sslctx; handler.starttls = function( self, _sslctx) if _sslctx then sslctx = _sslctx; handler:set_sslctx(sslctx); end if bufferqueuelen > 0 then out_put "server.lua: we need to do tls, but delaying until send buffer empty" needtls = true return end out_put( "server.lua: attempting to start tls on " .. tostring( socket ) ) local oldsocket, err = socket socket, err = ssl_wrap( socket, sslctx ) -- wrap socket --out_put( "server.lua: sslwrapped socket is " .. tostring( socket ) ) if err then out_put( "server.lua: error while starting tls on client: ", tostring(err) ) return nil, err -- fatal error end socket:settimeout( 0 ) -- add the new socket to our system send = socket.send receive = socket.receive shutdown = id _socketlist[ socket ] = handler _readlistlen = addsocket(_readlist, socket, _readlistlen) -- remove traces of the old socket _readlistlen = removesocket( _readlist, oldsocket, _readlistlen ) _sendlistlen = removesocket( _sendlist, oldsocket, _sendlistlen ) _socketlist[ oldsocket ] = nil handler.starttls = nil needtls = nil -- Secure now ssl = true handler.readbuffer = handshake handler.sendbuffer = handshake handshake( socket ) -- do handshake end handler.readbuffer = _readbuffer handler.sendbuffer = _sendbuffer end else handler.readbuffer = _readbuffer handler.sendbuffer = _sendbuffer end send = socket.send receive = socket.receive shutdown = ( ssl and id ) or socket.shutdown _socketlist[ socket ] = handler _readlistlen = addsocket(_readlist, socket, _readlistlen) return handler, socket end id = function( ) end idfalse = function( ) return false end addsocket = function( list, socket, len ) if not list[ socket ] then len = len + 1 list[ len ] = socket list[ socket ] = len end return len; end removesocket = function( list, socket, len ) -- this function removes sockets from a list ( copied from copas ) local pos = list[ socket ] if pos then list[ socket ] = nil local last = list[ len ] list[ len ] = nil if last ~= socket then list[ last ] = pos list[ pos ] = last end return len - 1 end return len end closesocket = function( socket ) _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) _readlistlen = removesocket( _readlist, socket, _readlistlen ) _socketlist[ socket ] = nil socket:close( ) --mem_free( ) end local function link(sender, receiver, buffersize) local sender_locked; local _sendbuffer = receiver.sendbuffer; function receiver.sendbuffer() _sendbuffer(); if sender_locked and receiver.bufferlen() < buffersize then sender:lock_read(false); -- Unlock now sender_locked = nil; end end local _readbuffer = sender.readbuffer; function sender.readbuffer() _readbuffer(); if not sender_locked and receiver.bufferlen() >= buffersize then sender_locked = true; sender:lock_read(true); end end end ----------------------------------// PUBLIC //-- addserver = function( addr, port, listeners, pattern, sslctx ) -- this function provides a way for other scripts to reg a server local err if type( listeners ) ~= "table" then err = "invalid listener table" end if type( port ) ~= "number" or not ( port >= 0 and port <= 65535 ) then err = "invalid port" elseif _server[ port ] then err = "listeners on port '" .. port .. "' already exist" elseif sslctx and not luasec then err = "luasec not found" end if err then out_error( "server.lua, port ", port, ": ", err ) return nil, err end addr = addr or "*" local server, err = socket_bind( addr, port ) if err then out_error( "server.lua, port ", port, ": ", err ) return nil, err end local handler, err = wrapserver( listeners, server, addr, port, pattern, sslctx, _maxclientsperserver ) -- wrap new server socket if not handler then server:close( ) return nil, err end server:settimeout( 0 ) _readlistlen = addsocket(_readlist, server, _readlistlen) _server[ port ] = handler _socketlist[ server ] = handler out_put( "server.lua: new "..(sslctx and "ssl " or "").."server listener on '", addr, ":", port, "'" ) return handler end getserver = function ( port ) return _server[ port ]; end removeserver = function( port ) local handler = _server[ port ] if not handler then return nil, "no server found on port '" .. tostring( port ) .. "'" end handler:close( ) _server[ port ] = nil return true end closeall = function( ) for _, handler in pairs( _socketlist ) do handler:close( ) _socketlist[ _ ] = nil end _readlistlen = 0 _sendlistlen = 0 _timerlistlen = 0 _server = { } _readlist = { } _sendlist = { } _timerlist = { } _socketlist = { } --mem_free( ) end getsettings = function( ) return _selecttimeout, _sleeptime, _maxsendlen, _maxreadlen, _checkinterval, _sendtimeout, _readtimeout, _cleanqueue, _maxclientsperserver, _maxsslhandshake end changesettings = function( new ) if type( new ) ~= "table" then return nil, "invalid settings table" end _selecttimeout = tonumber( new.timeout ) or _selecttimeout _sleeptime = tonumber( new.sleeptime ) or _sleeptime _maxsendlen = tonumber( new.maxsendlen ) or _maxsendlen _maxreadlen = tonumber( new.maxreadlen ) or _maxreadlen _checkinterval = tonumber( new.checkinterval ) or _checkinterval _sendtimeout = tonumber( new.sendtimeout ) or _sendtimeout _readtimeout = tonumber( new.readtimeout ) or _readtimeout _cleanqueue = new.cleanqueue _maxclientsperserver = new._maxclientsperserver or _maxclientsperserver _maxsslhandshake = new._maxsslhandshake or _maxsslhandshake return true end addtimer = function( listener ) if type( listener ) ~= "function" then return nil, "invalid listener function" end _timerlistlen = _timerlistlen + 1 _timerlist[ _timerlistlen ] = listener return true end stats = function( ) return _readtraffic, _sendtraffic, _readlistlen, _sendlistlen, _timerlistlen end local quitting; setquitting = function (quit) quitting = not not quit; end loop = function(once) -- this is the main loop of the program if quitting then return "quitting"; end if once then quitting = "once"; end repeat local read, write, err = socket_select( _readlist, _sendlist, _selecttimeout ) for i, socket in ipairs( write ) do -- send data waiting in writequeues local handler = _socketlist[ socket ] if handler then handler.sendbuffer( ) else closesocket( socket ) out_put "server.lua: found no handler and closed socket (writelist)" -- this should not happen end end for i, socket in ipairs( read ) do -- receive data local handler = _socketlist[ socket ] if handler then handler.readbuffer( ) else closesocket( socket ) out_put "server.lua: found no handler and closed socket (readlist)" -- this can happen end end for handler, err in pairs( _closelist ) do handler.disconnect( )( handler, err ) handler:close( true ) -- forced disconnect end clean( _closelist ) _currenttime = os_time( ) if os_difftime( _currenttime - _timer ) >= 1 then for i = 1, _timerlistlen do _timerlist[ i ]( _currenttime ) -- fire timers end _timer = _currenttime end socket_sleep( _sleeptime ) -- wait some time --collectgarbage( ) until quitting; if once and quitting == "once" then quitting = nil; return; end return "quitting" end step = function () return loop(true); end local function get_backend() return "select"; end --// EXPERIMENTAL //-- local wrapclient = function( socket, ip, serverport, listeners, pattern, sslctx ) local handler = wrapconnection( nil, listeners, socket, ip, serverport, "clientport", pattern, sslctx ) _socketlist[ socket ] = handler _sendlistlen = addsocket(_sendlist, socket, _sendlistlen) if listeners.onconnect then -- When socket is writeable, call onconnect local _sendbuffer = handler.sendbuffer; handler.sendbuffer = function () handler.sendbuffer = _sendbuffer; listeners.onconnect(handler); -- If there was data with the incoming packet, handle it now. if #handler:bufferqueue() > 0 then return _sendbuffer(); end end end return handler, socket end local addclient = function( address, port, listeners, pattern, sslctx ) local client, err = luasocket.tcp( ) if err then return nil, err end client:settimeout( 0 ) _, err = client:connect( address, port ) if err then -- try again local handler = wrapclient( client, address, port, listeners ) else wrapconnection( nil, listeners, client, address, port, "clientport", pattern, sslctx ) end end --// EXPERIMENTAL //-- ----------------------------------// BEGIN //-- use "setmetatable" ( _socketlist, { __mode = "k" } ) use "setmetatable" ( _readtimes, { __mode = "k" } ) use "setmetatable" ( _writetimes, { __mode = "k" } ) _timer = os_time( ) _starttime = os_time( ) addtimer( function( ) local difftime = os_difftime( _currenttime - _starttime ) if difftime > _checkinterval then _starttime = _currenttime for handler, timestamp in pairs( _writetimes ) do if os_difftime( _currenttime - timestamp ) > _sendtimeout then --_writetimes[ handler ] = nil handler.disconnect( )( handler, "send timeout" ) handler:close( true ) -- forced disconnect end end for handler, timestamp in pairs( _readtimes ) do if os_difftime( _currenttime - timestamp ) > _readtimeout then --_readtimes[ handler ] = nil handler.disconnect( )( handler, "read timeout" ) handler:close( ) -- forced disconnect? end end end end ) local function setlogger(new_logger) local old_logger = log; if new_logger then log = new_logger; end return old_logger; end ----------------------------------// PUBLIC INTERFACE //-- return { addclient = addclient, wrapclient = wrapclient, loop = loop, link = link, stats = stats, closeall = closeall, addtimer = addtimer, addserver = addserver, getserver = getserver, setlogger = setlogger, getsettings = getsettings, setquitting = setquitting, removeserver = removeserver, get_backend = get_backend, changesettings = changesettings, }