# HG changeset patch # User Matthew Wild # Date 1259170763 0 # Node ID 141896297ceaa849ceeeb421ed47cbc0ed4e9df0 # Parent 2abca9cc78b1c134093f5cc9d8bf53c60e394e41# Parent 7cb6460b18d8202c4cc18df301146e852dde2690 Merge with Tobias's SASL redesign branch diff -r 7cb6460b18d8 -r 141896297cea core/loggingmanager.lua --- a/core/loggingmanager.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/core/loggingmanager.lua Wed Nov 25 17:39:23 2009 +0000 @@ -17,6 +17,12 @@ local os_date, os_getenv = os.date, os.getenv; local getstyle, getstring = require "util.termcolours".getstyle, require "util.termcolours".getstring; +if os.getenv("__FLUSH_LOG") then + local io_flush = io.flush; + local _io_write = io_write; + io_write = function(...) _io_write(...); io_flush(); end +end + local config = require "core.configmanager"; local eventmanager = require "core.eventmanager"; local logger = require "util.logger"; diff -r 7cb6460b18d8 -r 141896297cea core/modulemanager.lua --- a/core/modulemanager.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/core/modulemanager.lua Wed Nov 25 17:39:23 2009 +0000 @@ -28,7 +28,9 @@ local next = next; local rawget = rawget; local error = error; -local tostring = tostring; +local tostring, tonumber = tostring, tonumber; + +local array, set = require "util.array", require "util.set"; local autoload_modules = {"presence", "message", "iq"}; @@ -400,6 +402,85 @@ return value; end +function api:get_option_string(...) + local value = self:get_option(...); + if type(value) == "table" then + if #value > 1 then + self:log("error", "Config option '%s' does not take a list, using just the first item", name); + end + value = value[1]; + end + if value == nil then + return nil; + end + return tostring(value); +end + +function api:get_option_number(name, ...) + local value = self:get_option(name, ...); + if type(value) == "table" then + if #value > 1 then + self:log("error", "Config option '%s' does not take a list, using just the first item", name); + end + value = value[1]; + end + local ret = tonumber(value); + if value ~= nil and ret == nil then + self:log("error", "Config option '%s' not understood, expecting a number", name); + end + return ret; +end + +function api:get_option_boolean(name, ...) + local value = self:get_option(name, ...); + if type(value) == "table" then + if #value > 1 then + self:log("error", "Config option '%s' does not take a list, using just the first item", name); + end + value = value[1]; + end + if value == nil then + return nil; + end + local ret = value == true or value == "true" or value == 1 or nil; + if ret == nil then + ret = (value == false or value == "false" or value == 0); + if ret then + ret = false; + else + ret = nil; + end + end + if ret == nil then + self:log("error", "Config option '%s' not understood, expecting true/false", name); + end + return ret; +end + +function api:get_option_array(name, ...) + local value = self:get_option(name, ...); + + if value == nil then + return nil; + end + + if type(value) ~= "table" then + return array{ value }; -- Assume any non-list is a single-item list + end + + return array():append(value); -- Clone +end + +function api:get_option_set(name, ...) + local value = self:get_option_array(name, ...); + + if value == nil then + return nil; + end + + return set.new(value); +end + local t_remove = _G.table.remove; local module_items = multitable_new(); function api:add_item(key, value) diff -r 7cb6460b18d8 -r 141896297cea core/objectmanager.lua --- a/core/objectmanager.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/core/objectmanager.lua Wed Nov 25 17:39:23 2009 +0000 @@ -6,63 +6,63 @@ -- COPYING file in the source package for more information. -- - -local new_multitable = require "util.multitable".new; -local t_insert = table.insert; -local t_concat = table.concat; -local tostring = tostring; -local unpack = unpack; -local pairs = pairs; -local error = error; -local type = type; -local _G = _G; - -local data = new_multitable(); - -module "objectmanager" - -function set(...) - return data:set(...); -end -function remove(...) - return data:remove(...); -end -function get(...) - return data:get(...); -end - -local function get_path(path) - if type(path) == "table" then return path; end - local s = {}; - for part in tostring(path):gmatch("[%w_]+") do - t_insert(s, part); - end - return s; -end - -function get_object(path) - path = get_path(path) - return data:get(unpack(path)), path; -end -function set_object(path, object) - path = get_path(path); - data:set(unpack(path), object); -end - -data:set("ls", function(_dir) - local obj, dir = get_object(_dir); - if not obj then error("object not found: " .. t_concat(dir, '/')); end - local r = {}; - if type(obj) == "table" then - for key, val in pairs(obj) do - r[key] = type(val); - end - end - return r; -end); -data:set("get", get_object); -data:set("set", set_object); -data:set("echo", function(...) return {...}; end); -data:set("_G", _G); - -return _M; + +local new_multitable = require "util.multitable".new; +local t_insert = table.insert; +local t_concat = table.concat; +local tostring = tostring; +local unpack = unpack; +local pairs = pairs; +local error = error; +local type = type; +local _G = _G; + +local data = new_multitable(); + +module "objectmanager" + +function set(...) + return data:set(...); +end +function remove(...) + return data:remove(...); +end +function get(...) + return data:get(...); +end + +local function get_path(path) + if type(path) == "table" then return path; end + local s = {}; + for part in tostring(path):gmatch("[%w_]+") do + t_insert(s, part); + end + return s; +end + +function get_object(path) + path = get_path(path) + return data:get(unpack(path)), path; +end +function set_object(path, object) + path = get_path(path); + data:set(unpack(path), object); +end + +data:set("ls", function(_dir) + local obj, dir = get_object(_dir); + if not obj then error("object not found: " .. t_concat(dir, '/')); end + local r = {}; + if type(obj) == "table" then + for key, val in pairs(obj) do + r[key] = type(val); + end + end + return r; +end); +data:set("get", get_object); +data:set("set", set_object); +data:set("echo", function(...) return {...}; end); +data:set("_G", _G); + +return _M; diff -r 7cb6460b18d8 -r 141896297cea core/s2smanager.lua --- a/core/s2smanager.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/core/s2smanager.lua Wed Nov 25 17:39:23 2009 +0000 @@ -79,15 +79,15 @@ end function send_to_host(from_host, to_host, data) + if not hosts[from_host] then + log("warn", "Attempt to send stanza from %s - a host we don't serve", from_host); + return false; + end local host = hosts[from_host].s2sout[to_host]; if host then -- We have a connection to this host already - if host.type == "s2sout_unauthed" and data.name ~= "db:verify" and ((not data.xmlns) or data.xmlns == "jabber:client" or data.xmlns == "jabber:server") then + if host.type == "s2sout_unauthed" and (data.name ~= "db:verify" or not host.dialback_key) and ((not data.xmlns) or data.xmlns == "jabber:client" or data.xmlns == "jabber:server") then (host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host); - if not host.notopen and not host.dialback_key and host.sends2s then - host.log("debug", "dialback had not been initiated"); - initiate_dialback(host); - end -- Queue stanza until we are able to send it if host.sendq then t_insert(host.sendq, {tostring(data), st.reply(data)}); @@ -110,6 +110,7 @@ else log("debug", "opening a new outgoing connection for this stanza"); local host_session = new_outgoing(from_host, to_host); + -- Store in buffer host_session.sendq = { {tostring(data), st.reply(data)} }; log("debug", "stanza [%s] queued until connection complete", tostring(data.name)); @@ -131,7 +132,7 @@ open_sessions = open_sessions + 1; local w, log = conn.write, logger_init("s2sin"..tostring(conn):match("[a-f0-9]+$")); session.log = log; - session.sends2s = function (t) log("debug", "sending: %s", tostring(t)); w(tostring(t)); end + session.sends2s = function (t) log("debug", "sending: %s", t.top_tag and t:top_tag() or t:match("^([^>]*>?)")); w(conn, tostring(t)); end incoming_s2s[session] = true; add_task(connect_timeout, function () if session.conn ~= conn or @@ -159,7 +160,7 @@ host_session.log = log; end - -- This is the first call, can't fail (the first step is DNS lookup) + -- Kick the connection attempting machine attempt_connection(host_session); if not host_session.sends2s then @@ -187,6 +188,10 @@ local from_host, to_host = host_session.from_host, host_session.to_host; local connect_host, connect_port = idna_to_ascii(to_host), 5269; + if not connect_host then + return false; + end + if not err then -- This is our first attempt log("debug", "First attempt to connect to %s, starting with SRV lookup...", to_host); host_session.connecting = true; @@ -316,9 +321,9 @@ cl.register_outgoing(conn, host_session); local w, log = conn.write, host_session.log; - host_session.sends2s = function (t) log("debug", "sending: %s", tostring(t)); w(tostring(t)); end + host_session.sends2s = function (t) log("debug", "sending: %s", (t.top_tag and t:top_tag()) or t:match("^[^>]*>?")); w(conn, tostring(t)); end - conn.write(format([[]], from_host, to_host)); + conn:write(format([[]], from_host, to_host)); log("debug", "Connection attempt in progress..."); add_task(connect_timeout, function () if host_session.conn ~= conn or diff -r 7cb6460b18d8 -r 141896297cea core/sessionmanager.lua --- a/core/sessionmanager.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/core/sessionmanager.lua Wed Nov 25 17:39:23 2009 +0000 @@ -10,7 +10,6 @@ local tonumber, tostring = tonumber, tostring; local ipairs, pairs, print, next= ipairs, pairs, print, next; -local collectgarbage = collectgarbage; local format = import("string", "format"); local hosts = hosts; @@ -50,8 +49,8 @@ open_sessions = open_sessions + 1; log("debug", "open sessions now: ".. open_sessions); local w = conn.write; - session.send = function (t) w(tostring(t)); end - session.ip = conn.ip(); + session.send = function (t) w(conn, tostring(t)); end + session.ip = conn:ip(); local conn_name = "c2s"..tostring(conn):match("[a-f0-9]+$"); session.log = logger.init(conn_name); @@ -201,22 +200,32 @@ end function send_to_available_resources(user, host, stanza) + local jid = user.."@"..host; local count = 0; - local to = stanza.attr.to; - stanza.attr.to = nil; - local h = hosts[host]; - if h and h.type == "local" then - local u = h.sessions[user]; - if u then - for k, session in pairs(u.sessions) do - if session.presence then - session.send(stanza); - count = count + 1; - end + local user = bare_sessions[jid]; + if user then + for k, session in pairs(user.sessions) do + if session.presence then + session.send(stanza); + count = count + 1; end end end - stanza.attr.to = to; + return count; +end + +function send_to_interested_resources(user, host, stanza) + local jid = user.."@"..host; + local count = 0; + local user = bare_sessions[jid]; + if user then + for k, session in pairs(user.sessions) do + if session.interested then + session.send(stanza); + count = count + 1; + end + end + end return count; end diff -r 7cb6460b18d8 -r 141896297cea core/stanza_router.lua --- a/core/stanza_router.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/core/stanza_router.lua Wed Nov 25 17:39:23 2009 +0000 @@ -180,7 +180,7 @@ local xmlns = stanza.attr.xmlns; --stanza.attr.xmlns = "jabber:server"; stanza.attr.xmlns = nil; - log("debug", "sending s2s stanza: %s", tostring(stanza)); + log("debug", "sending s2s stanza: %s", tostring(stanza.top_tag and stanza:top_tag()) or stanza); send_s2s(origin.host, host, stanza); -- TODO handle remote routing errors stanza.attr.xmlns = xmlns; -- reset else diff -r 7cb6460b18d8 -r 141896297cea net/adns.lua --- a/net/adns.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/net/adns.lua Wed Nov 25 17:39:23 2009 +0000 @@ -45,10 +45,10 @@ function new_async_socket(sock, resolver) local newconn, peername = {}, ""; local listener = {}; - function listener.incoming(conn, data) + function listener.onincoming(conn, data) dns.feed(sock, data); end - function listener.disconnect(conn, err) + function listener.ondisconnect(conn, err) log("warn", "DNS socket for %s disconnected: %s", peername, err); local servers = resolver.server; if resolver.socketset[newconn.handler] == resolver.best_server and resolver.best_server == #servers then @@ -68,7 +68,7 @@ newconn.handler.setsockname = function (_, ...) return sock:setsockname(...); end newconn.handler.setpeername = function (_, ...) peername = (...); local ret = sock:setpeername(...); _.setsend(sock.send); return ret; end newconn.handler.connect = function (_, ...) return sock:connect(...) end - newconn.handler.send = function (_, data) _.write(data); return _.sendbuffer(); end + newconn.handler.send = function (_, data) _.write(_, data); return _.sendbuffer(); end return newconn.handler; end diff -r 7cb6460b18d8 -r 141896297cea net/connlisteners.lua --- a/net/connlisteners.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/net/connlisteners.lua Wed Nov 25 17:39:23 2009 +0000 @@ -61,9 +61,14 @@ end end - return server.addserver(h, - (udata and udata.port) or h.default_port or error("Can't start listener "..name.." because no port was specified, and it has no default port", 0), - (udata and udata.interface) or h.default_interface or "*", (udata and udata.mode) or h.default_mode or 1, (udata and udata.ssl) or nil, 99999999, udata and udata.type == "ssl"); + local interface = (udata and udata.interface) or h.default_interface or "*"; + local port = (udata and udata.port) or h.default_port or error("Can't start listener "..name.." because no port was specified, and it has no default port", 0); + local mode = (udata and udata.mode) or h.default_mode or 1; + local ssl = (udata and udata.ssl) or nil; + local maxclients = 99999999; + local autossl = udata and udata.type == "ssl"; + + return server.addserver(interface, port, h, mode, ssl, autossl); end return _M; diff -r 7cb6460b18d8 -r 141896297cea net/http.lua --- a/net/http.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/net/http.lua Wed Nov 25 17:39:23 2009 +0000 @@ -152,7 +152,7 @@ end req.handler, req.conn = server.wrapclient(socket.tcp(), req.host, req.port or 80, listener, "*a"); - req.write = req.handler.write; + req.write = function (...) return req.handler:write(...); end req.conn:settimeout(0); local ok, err = req.conn:connect(req.host, req.port or 80); if not ok and err ~= "timeout" then @@ -200,7 +200,7 @@ function destroy_request(request) if request.conn then request.handler.close() - listener.disconnect(request.conn, "closed"); + listener.ondisconnect(request.conn, "closed"); end end diff -r 7cb6460b18d8 -r 141896297cea net/httpclient_listener.lua --- a/net/httpclient_listener.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/net/httpclient_listener.lua Wed Nov 25 17:39:23 2009 +0000 @@ -15,7 +15,7 @@ local httpclient = { default_port = 80, default_mode = "*a" }; -function httpclient.listener(conn, data) +function httpclient.onincoming(conn, data) local request = requests[conn]; if not request then @@ -28,7 +28,7 @@ end end -function httpclient.disconnect(conn, err) +function httpclient.ondisconnect(conn, err) local request = requests[conn]; if request then request:reader(nil); diff -r 7cb6460b18d8 -r 141896297cea net/httpserver.lua --- a/net/httpserver.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/net/httpserver.lua Wed Nov 25 17:39:23 2009 +0000 @@ -209,7 +209,7 @@ function new_request(handler) return { handler = handler, conn = handler.socket, - write = handler.write, state = "request", + write = function (...) return handler:write(...); end, state = "request", server = http_servers[handler.serverport()], send = send_response, destroy = destroy_request, @@ -230,7 +230,7 @@ end request.handler.close() if request.conn then - listener.disconnect(request.handler, "closed"); + listener.ondisconnect(request.handler, "closed"); end end end diff -r 7cb6460b18d8 -r 141896297cea net/httpserver_listener.lua --- a/net/httpserver_listener.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/net/httpserver_listener.lua Wed Nov 25 17:39:23 2009 +0000 @@ -16,7 +16,7 @@ local httpserver = { default_port = 80, default_mode = "*a" }; -function httpserver.listener(conn, data) +function httpserver.onincoming(conn, data) local request = requests[conn]; if not request then @@ -34,7 +34,7 @@ end end -function httpserver.disconnect(conn, err) +function httpserver.ondisconnect(conn, err) local request = requests[conn]; if request and not request.destroyed then request.conn = nil; diff -r 7cb6460b18d8 -r 141896297cea net/server.lua --- a/net/server.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/net/server.lua Wed Nov 25 17:39:23 2009 +0000 @@ -1,914 +1,33 @@ --- --- server.lua by blastbeat of the luadch project --- Re-used here under the MIT/X Consortium License --- --- Modifications (C) 2008-2009 Matthew Wild, Waqas Hussain --- - --- // wrapping luadch stuff // -- - -local use = function( what ) - return _G[ what ] -end -local clean = function( tbl ) - for i, k in pairs( tbl ) do - tbl[ i ] = nil - end -end - -local log, table_concat = require ("util.logger").init("socket"), table.concat; -local out_put = function (...) return log("debug", table_concat{...}); end -local out_error = function (...) return log("warn", table_concat{...}); end -local mem_free = collectgarbage - -----------------------------------// DECLARATION //-- - ---// constants //-- - -local STAT_UNIT = 1 -- byte - ---// lua functions //-- - -local type = use "type" -local pairs = use "pairs" -local ipairs = use "ipairs" -local tostring = use "tostring" -local collectgarbage = use "collectgarbage" - ---// lua libs //-- - -local os = use "os" -local table = use "table" -local string = use "string" -local coroutine = use "coroutine" - ---// lua lib methods //-- - -local os_time = os.time -local os_difftime = os.difftime -local table_concat = table.concat -local table_remove = table.remove -local string_len = string.len -local string_sub = string.sub -local coroutine_wrap = coroutine.wrap -local coroutine_yield = coroutine.yield - ---// extern libs //-- - -local luasec = select( 2, pcall( require, "ssl" ) ) -local luasocket = require "socket" - ---// extern lib methods //-- - -local ssl_wrap = ( luasec and luasec.wrap ) -local socket_bind = luasocket.bind -local socket_sleep = luasocket.sleep -local socket_select = luasocket.select -local ssl_newcontext = ( luasec and luasec.newcontext ) - ---// functions //-- - -local id -local loop -local stats -local idfalse -local addtimer -local closeall -local addserver -local getserver -local wrapserver -local getsettings -local closesocket -local removesocket -local removeserver -local changetimeout -local wrapconnection -local changesettings - ---// tables //-- - -local _server -local _readlist -local _timerlist -local _sendlist -local _socketlist -local _closelist -local _readtimes -local _writetimes - ---// simple data types //-- - -local _ -local _readlistlen -local _sendlistlen -local _timerlistlen - -local _sendtraffic -local _readtraffic - -local _selecttimeout -local _sleeptime - -local _starttime -local _currenttime - -local _maxsendlen -local _maxreadlen - -local _checkinterval -local _sendtimeout -local _readtimeout - -local _cleanqueue - -local _timer - -local _maxclientsperserver - -----------------------------------// DEFINITION //-- - -_server = { } -- key = port, value = table; list of listening servers -_readlist = { } -- array with sockets to read from -_sendlist = { } -- arrary with sockets to write to -_timerlist = { } -- array of timer functions -_socketlist = { } -- key = socket, value = wrapped socket (handlers) -_readtimes = { } -- key = handler, value = timestamp of last data reading -_writetimes = { } -- key = handler, value = timestamp of last data writing/sending -_closelist = { } -- handlers to close - -_readlistlen = 0 -- length of readlist -_sendlistlen = 0 -- length of sendlist -_timerlistlen = 0 -- lenght of timerlist - -_sendtraffic = 0 -- some stats -_readtraffic = 0 - -_selecttimeout = 1 -- timeout of socket.select -_sleeptime = 0 -- time to wait at the end of every loop - -_maxsendlen = 51000 * 1024 -- max len of send buffer -_maxreadlen = 25000 * 1024 -- max len of read buffer - -_checkinterval = 1200000 -- interval in secs to check idle clients -_sendtimeout = 60000 -- allowed send idle time in secs -_readtimeout = 6 * 60 * 60 -- allowed read idle time in secs - -_cleanqueue = false -- clean bufferqueue after using - -_maxclientsperserver = 1000 - -_maxsslhandshake = 30 -- max handshake round-trips -----------------------------------// PRIVATE //-- - -wrapserver = function( listeners, socket, ip, serverport, pattern, sslctx, maxconnections, startssl ) -- this function wraps a server - - maxconnections = maxconnections or _maxclientsperserver - - local connections = 0 - - local dispatch, disconnect = listeners.incoming or listeners.listener, listeners.disconnect - - local err - - local ssl = false - - if sslctx then - ssl = true - if not ssl_newcontext then - out_error "luasec not found" - ssl = false - end - if type( sslctx ) ~= "table" then - out_error "server.lua: wrong server sslctx" - ssl = false - end - local ctx; - ctx, err = ssl_newcontext( sslctx ) - if not ctx then - err = err or "wrong sslctx parameters" - local file; - file = err:match("^error loading (.-) %("); - if file then - if file == "private key" then - file = sslctx.key or "your private key"; - elseif file == "certificate" then - file = sslctx.certificate or "your certificate file"; - end - local reason = err:match("%((.+)%)$") or "some reason"; - if reason == "Permission denied" then - reason = "Check that the permissions allow Prosody to read this file."; - elseif reason == "No such file or directory" then - reason = "Check that the path is correct, and the file exists."; - elseif reason == "system lib" then - reason = "Previous error (see logs), or other system error."; - else - reason = "Reason: "..tostring(reason or "unknown"):lower(); - end - log("error", "SSL/TLS: Failed to load %s: %s", file, reason); - else - log("error", "SSL/TLS: Error initialising for port %d: %s", serverport, err ); - end - ssl = false - end - sslctx = ctx; - end - if not ssl then - sslctx = false; - if startssl then - log("error", "Failed to listen on port %d due to SSL/TLS to SSL/TLS initialisation errors (see logs)", serverport ) - return nil, "Cannot start ssl, see log for details" - end - end - - local accept = socket.accept - - --// public methods of the object //-- - - local handler = { } - - handler.shutdown = function( ) end - - handler.ssl = function( ) - return ssl - end - handler.sslctx = function( ) - return sslctx - end - handler.remove = function( ) - connections = connections - 1 - end - handler.close = function( ) - for _, handler in pairs( _socketlist ) do - if handler.serverport == serverport then - handler.disconnect( handler, "server closed" ) - handler.close( true ) - end - end - socket:close( ) - _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) - _readlistlen = removesocket( _readlist, socket, _readlistlen ) - _socketlist[ socket ] = nil - handler = nil - socket = nil - --mem_free( ) - out_put "server.lua: closed server handler and removed sockets from list" - end - handler.ip = function( ) - return ip - end - handler.serverport = function( ) - return serverport - end - handler.socket = function( ) - return socket - end - handler.readbuffer = function( ) - if connections > maxconnections then - out_put( "server.lua: refused new client connection: server full" ) - return false - end - local client, err = accept( socket ) -- try to accept - if client then - local ip, clientport = client:getpeername( ) - client:settimeout( 0 ) - local handler, client, err = wrapconnection( handler, listeners, client, ip, serverport, clientport, pattern, sslctx, startssl ) -- wrap new client socket - if err then -- error while wrapping ssl socket - return false - end - connections = connections + 1 - out_put( "server.lua: accepted new client connection from ", tostring(ip), ":", tostring(clientport), " to ", tostring(serverport)) - return dispatch( handler ) - elseif err then -- maybe timeout or something else - out_put( "server.lua: error with new client connection: ", tostring(err) ) - return false - end - end - return handler -end - -wrapconnection = function( server, listeners, socket, ip, serverport, clientport, pattern, sslctx, startssl ) -- this function wraps a client to a handler object - - socket:settimeout( 0 ) - - --// local import of socket methods //-- - - local send - local receive - local shutdown - - --// private closures of the object //-- - - local ssl - - local dispatch = listeners.incoming or listeners.listener - local status = listeners.status - local disconnect = listeners.disconnect - - local bufferqueue = { } -- buffer array - local bufferqueuelen = 0 -- end of buffer array - - local toclose - local fatalerror - local needtls - - local bufferlen = 0 - - local noread = false - local nosend = false - - local sendtraffic, readtraffic = 0, 0 - - local maxsendlen = _maxsendlen - local maxreadlen = _maxreadlen - - --// public methods of the object //-- - - local handler = bufferqueue -- saves a table ^_^ - - handler.dispatch = function( ) - return dispatch - end - handler.disconnect = function( ) - return disconnect - end - handler.setlistener = function( listeners ) - dispatch = listeners.incoming - disconnect = listeners.disconnect - end - handler.getstats = function( ) - return readtraffic, sendtraffic - end - handler.ssl = function( ) - return ssl - end - handler.sslctx = function ( ) - return sslctx - end - handler.send = function( _, data, i, j ) - return send( socket, data, i, j ) - end - handler.receive = function( pattern, prefix ) - return receive( socket, pattern, prefix ) - end - handler.shutdown = function( pattern ) - return shutdown( socket, pattern ) - end - handler.close = function( forced ) - if not handler then return true; end - _readlistlen = removesocket( _readlist, socket, _readlistlen ) - _readtimes[ handler ] = nil - if bufferqueuelen ~= 0 then - if not ( forced or fatalerror ) then - handler.sendbuffer( ) - if bufferqueuelen ~= 0 then -- try again... - if handler then - handler.write = nil -- ... but no further writing allowed - end - toclose = true - return false - end - else - send( socket, table_concat( bufferqueue, "", 1, bufferqueuelen ), 1, bufferlen ) -- forced send - end - end - if socket then - _ = shutdown and shutdown( socket ) - socket:close( ) - _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) - _socketlist[ socket ] = nil - socket = nil - else - out_put "server.lua: socket already closed" - end - if handler then - _writetimes[ handler ] = nil - _closelist[ handler ] = nil - handler = nil - end - if server then - server.remove( ) - end - out_put "server.lua: closed client handler and removed socket from list" - return true - end - handler.ip = function( ) - return ip - end - handler.serverport = function( ) - return serverport - end - handler.clientport = function( ) - return clientport - end - local write = function( data ) - bufferlen = bufferlen + string_len( data ) - if bufferlen > maxsendlen then - _closelist[ handler ] = "send buffer exceeded" -- cannot close the client at the moment, have to wait to the end of the cycle - handler.write = idfalse -- dont write anymore - return false - elseif socket and not _sendlist[ socket ] then - _sendlistlen = addsocket(_sendlist, socket, _sendlistlen) - end - bufferqueuelen = bufferqueuelen + 1 - bufferqueue[ bufferqueuelen ] = data - if handler then - _writetimes[ handler ] = _writetimes[ handler ] or _currenttime - end - return true - end - handler.write = write - handler.bufferqueue = function( ) - return bufferqueue - end - handler.socket = function( ) - return socket - end - handler.pattern = function( new ) - pattern = new or pattern - return pattern - end - handler.setsend = function ( newsend ) - send = newsend or send - return send - end - handler.bufferlen = function( readlen, sendlen ) - maxsendlen = sendlen or maxsendlen - maxreadlen = readlen or maxreadlen - return maxreadlen, maxsendlen - end - handler.lock = function( switch ) - if switch == true then - handler.write = idfalse - local tmp = _sendlistlen - _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) - _writetimes[ handler ] = nil - if _sendlistlen ~= tmp then - nosend = true - end - tmp = _readlistlen - _readlistlen = removesocket( _readlist, socket, _readlistlen ) - _readtimes[ handler ] = nil - if _readlistlen ~= tmp then - noread = true - end - elseif switch == false then - handler.write = write - if noread then - noread = false - _readlistlen = addsocket(_readlist, socket, _readlistlen) - _readtimes[ handler ] = _currenttime - end - if nosend then - nosend = false - write( "" ) - end - end - return noread, nosend - end - local _readbuffer = function( ) -- this function reads data - local buffer, err, part = receive( socket, pattern ) -- receive buffer with "pattern" - if not err or ( err == "timeout" or err == "wantread" ) then -- received something - local buffer = buffer or part or "" - local len = string_len( buffer ) - if len > maxreadlen then - disconnect( handler, "receive buffer exceeded" ) - handler.close( true ) - return false - end - local count = len * STAT_UNIT - readtraffic = readtraffic + count - _readtraffic = _readtraffic + count - _readtimes[ handler ] = _currenttime - --out_put( "server.lua: read data '", buffer:gsub("[^%w%p ]", "."), "', error: ", err ) - return dispatch( handler, buffer, err ) - else -- connections was closed or fatal error - out_put( "server.lua: client ", tostring(ip), ":", tostring(clientport), " read error: ", tostring(err) ) - fatalerror = true - disconnect( handler, err ) - _ = handler and handler.close( ) - return false - end - end - local _sendbuffer = function( ) -- this function sends data - local succ, err, byte, buffer, count; - local count; - if socket then - buffer = table_concat( bufferqueue, "", 1, bufferqueuelen ) - succ, err, byte = send( socket, buffer, 1, bufferlen ) - count = ( succ or byte or 0 ) * STAT_UNIT - sendtraffic = sendtraffic + count - _sendtraffic = _sendtraffic + count - _ = _cleanqueue and clean( bufferqueue ) - --out_put( "server.lua: sended '", buffer, "', bytes: ", tostring(succ), ", error: ", tostring(err), ", part: ", tostring(byte), ", to: ", tostring(ip), ":", tostring(clientport) ) - else - succ, err, count = false, "closed", 0; - end - if succ then -- sending succesful - bufferqueuelen = 0 - bufferlen = 0 - _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) -- delete socket from writelist - _ = needtls and handler.starttls(true) - _writetimes[ handler ] = nil - _ = toclose and handler.close( ) - return true - elseif byte and ( err == "timeout" or err == "wantwrite" ) then -- want write - buffer = string_sub( buffer, byte + 1, bufferlen ) -- new buffer - bufferqueue[ 1 ] = buffer -- insert new buffer in queue - bufferqueuelen = 1 - bufferlen = bufferlen - byte - _writetimes[ handler ] = _currenttime - return true - else -- connection was closed during sending or fatal error - out_put( "server.lua: client ", tostring(ip), ":", tostring(clientport), " write error: ", tostring(err) ) - fatalerror = true - disconnect( handler, err ) - _ = handler and handler.close( ) - return false - end - end - - -- Set the sslctx - local handshake; - function handler.set_sslctx(new_sslctx) - ssl = true - sslctx = new_sslctx; - local wrote - local read - handshake = coroutine_wrap( function( client ) -- create handshake coroutine - local err - for i = 1, _maxsslhandshake do - _sendlistlen = ( wrote and removesocket( _sendlist, client, _sendlistlen ) ) or _sendlistlen - _readlistlen = ( read and removesocket( _readlist, client, _readlistlen ) ) or _readlistlen - read, wrote = nil, nil - _, err = client:dohandshake( ) - if not err then - out_put( "server.lua: ssl handshake done" ) - handler.readbuffer = _readbuffer -- when handshake is done, replace the handshake function with regular functions - handler.sendbuffer = _sendbuffer - _ = status and status( handler, "ssl-handshake-complete" ) - _readlistlen = addsocket(_readlist, client, _readlistlen) - return true - else - out_put( "server.lua: error during ssl handshake: ", tostring(err) ) - if err == "wantwrite" and not wrote then - _sendlistlen = addsocket(_sendlist, client, _sendlistlen) - wrote = true - elseif err == "wantread" and not read then - _readlistlen = addsocket(_readlist, client, _readlistlen) - read = true - else - break; - end - --coroutine_yield( handler, nil, err ) -- handshake not finished - coroutine_yield( ) - end - end - disconnect( handler, "ssl handshake failed" ) - _ = handler and handler.close( true ) -- forced disconnect - return false -- handshake failed - end - ) - end - if sslctx then -- ssl? - handler.set_sslctx(sslctx); - if startssl then -- ssl now? - --out_put("server.lua: ", "starting ssl handshake") - local err - socket, err = ssl_wrap( socket, sslctx ) -- wrap socket - if err then - out_put( "server.lua: ssl error: ", tostring(err) ) - --mem_free( ) - return nil, nil, err -- fatal error - end - socket:settimeout( 0 ) - handler.readbuffer = handshake - handler.sendbuffer = handshake - handshake( socket ) -- do handshake - if not socket then - return nil, nil, "ssl handshake failed"; - end - else - -- We're not automatically doing SSL, so we're not secure (yet) - ssl = false - handler.starttls = function( now ) - if not now then - --out_put "server.lua: we need to do tls, but delaying until later" - needtls = true - return - end - --out_put( "server.lua: attempting to start tls on " .. tostring( socket ) ) - local oldsocket, err = socket - socket, err = ssl_wrap( socket, sslctx ) -- wrap socket - --out_put( "server.lua: sslwrapped socket is " .. tostring( socket ) ) - if err then - out_put( "server.lua: error while starting tls on client: ", tostring(err) ) - return nil, err -- fatal error - end - - socket:settimeout( 0 ) - - -- add the new socket to our system - - send = socket.send - receive = socket.receive - shutdown = id - - _socketlist[ socket ] = handler - _readlistlen = addsocket(_readlist, socket, _readlistlen) - - -- remove traces of the old socket - - _readlistlen = removesocket( _readlist, oldsocket, _readlistlen ) - _sendlistlen = removesocket( _sendlist, oldsocket, _sendlistlen ) - _socketlist[ oldsocket ] = nil - - handler.starttls = nil - needtls = nil - - -- Secure now - ssl = true - - handler.readbuffer = handshake - handler.sendbuffer = handshake - handshake( socket ) -- do handshake - end - handler.readbuffer = _readbuffer - handler.sendbuffer = _sendbuffer - end - else -- normal connection - ssl = false - handler.readbuffer = _readbuffer - handler.sendbuffer = _sendbuffer - end - - send = socket.send - receive = socket.receive - shutdown = ( ssl and id ) or socket.shutdown - - _socketlist[ socket ] = handler - _readlistlen = addsocket(_readlist, socket, _readlistlen) - - return handler, socket -end - -id = function( ) -end - -idfalse = function( ) - return false -end - -addsocket = function( list, socket, len ) - if not list[ socket ] then - len = len + 1 - list[ len ] = socket - list[ socket ] = len - end - return len; -end - -removesocket = function( list, socket, len ) -- this function removes sockets from a list ( copied from copas ) - local pos = list[ socket ] - if pos then - list[ socket ] = nil - local last = list[ len ] - list[ len ] = nil - if last ~= socket then - list[ last ] = pos - list[ pos ] = last - end - return len - 1 - end - return len -end - -closesocket = function( socket ) - _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) - _readlistlen = removesocket( _readlist, socket, _readlistlen ) - _socketlist[ socket ] = nil - socket:close( ) - --mem_free( ) -end - -----------------------------------// PUBLIC //-- - -addserver = function( listeners, port, addr, pattern, sslctx, maxconnections, startssl ) -- this function provides a way for other scripts to reg a server - local err - --out_put("server.lua: autossl on ", port, " is ", startssl) - if type( listeners ) ~= "table" then - err = "invalid listener table" - end - if not type( port ) == "number" or not ( port >= 0 and port <= 65535 ) then - err = "invalid port" - elseif _server[ port ] then - err = "listeners on port '" .. port .. "' already exist" - elseif sslctx and not luasec then - err = "luasec not found" - end - if err then - out_error( "server.lua, port ", port, ": ", err ) - return nil, err - end - addr = addr or "*" - local server, err = socket_bind( addr, port ) - if err then - out_error( "server.lua, port ", port, ": ", err ) - return nil, err - end - local handler, err = wrapserver( listeners, server, addr, port, pattern, sslctx, maxconnections, startssl ) -- wrap new server socket - if not handler then - server:close( ) - return nil, err - end - server:settimeout( 0 ) - _readlistlen = addsocket(_readlist, server, _readlistlen) - _server[ port ] = handler - _socketlist[ server ] = handler - out_put( "server.lua: new server listener on '", addr, ":", port, "'" ) - return handler -end - -getserver = function ( port ) - return _server[ port ]; -end - -removeserver = function( port ) - local handler = _server[ port ] - if not handler then - return nil, "no server found on port '" .. tostring( port ) .. "'" - end - handler.close( ) - _server[ port ] = nil - return true -end - -closeall = function( ) - for _, handler in pairs( _socketlist ) do - handler.close( ) - _socketlist[ _ ] = nil - end - _readlistlen = 0 - _sendlistlen = 0 - _timerlistlen = 0 - _server = { } - _readlist = { } - _sendlist = { } - _timerlist = { } - _socketlist = { } - --mem_free( ) -end - -getsettings = function( ) - return _selecttimeout, _sleeptime, _maxsendlen, _maxreadlen, _checkinterval, _sendtimeout, _readtimeout, _cleanqueue, _maxclientsperserver, _maxsslhandshake -end - -changesettings = function( new ) - if type( new ) ~= "table" then - return nil, "invalid settings table" - end - _selecttimeout = tonumber( new.timeout ) or _selecttimeout - _sleeptime = tonumber( new.sleeptime ) or _sleeptime - _maxsendlen = tonumber( new.maxsendlen ) or _maxsendlen - _maxreadlen = tonumber( new.maxreadlen ) or _maxreadlen - _checkinterval = tonumber( new.checkinterval ) or _checkinterval - _sendtimeout = tonumber( new.sendtimeout ) or _sendtimeout - _readtimeout = tonumber( new.readtimeout ) or _readtimeout - _cleanqueue = new.cleanqueue - _maxclientsperserver = new._maxclientsperserver or _maxclientsperserver - _maxsslhandshake = new._maxsslhandshake or _maxsslhandshake - return true -end - -addtimer = function( listener ) - if type( listener ) ~= "function" then - return nil, "invalid listener function" - end - _timerlistlen = _timerlistlen + 1 - _timerlist[ _timerlistlen ] = listener - return true -end - -stats = function( ) - return _readtraffic, _sendtraffic, _readlistlen, _sendlistlen, _timerlistlen -end - -local dontstop = true; -- thinking about tomorrow, ... - -setquitting = function (quit) - dontstop = not quit; - return; -end - -loop = function( ) -- this is the main loop of the program - while dontstop do - local read, write, err = socket_select( _readlist, _sendlist, _selecttimeout ) - for i, socket in ipairs( write ) do -- send data waiting in writequeues - local handler = _socketlist[ socket ] - if handler then - handler.sendbuffer( ) - else - closesocket( socket ) - out_put "server.lua: found no handler and closed socket (writelist)" -- this should not happen - end - end - for i, socket in ipairs( read ) do -- receive data - local handler = _socketlist[ socket ] - if handler then - handler.readbuffer( ) - else - closesocket( socket ) - out_put "server.lua: found no handler and closed socket (readlist)" -- this can happen - end - end - for handler, err in pairs( _closelist ) do - handler.disconnect( )( handler, err ) - handler.close( true ) -- forced disconnect - end - clean( _closelist ) - _currenttime = os_time( ) - if os_difftime( _currenttime - _timer ) >= 1 then - for i = 1, _timerlistlen do - _timerlist[ i ]( _currenttime ) -- fire timers - end - _timer = _currenttime - end - socket_sleep( _sleeptime ) -- wait some time - --collectgarbage( ) - end - return "quitting" -end - ---// EXPERIMENTAL //-- - -local wrapclient = function( socket, ip, serverport, listeners, pattern, sslctx, startssl ) - local handler = wrapconnection( nil, listeners, socket, ip, serverport, "clientport", pattern, sslctx, startssl ) - _socketlist[ socket ] = handler - _sendlistlen = addsocket(_sendlist, socket, _sendlistlen) - return handler, socket -end - -local addclient = function( address, port, listeners, pattern, sslctx, startssl ) - local client, err = luasocket.tcp( ) - if err then - return nil, err - end - client:settimeout( 0 ) - _, err = client:connect( address, port ) - if err then -- try again - local handler = wrapclient( client, address, port, listeners ) - else - wrapconnection( nil, listeners, client, address, port, "clientport", pattern, sslctx, startssl ) - end -end - ---// EXPERIMENTAL //-- - -----------------------------------// BEGIN //-- - -use "setmetatable" ( _socketlist, { __mode = "k" } ) -use "setmetatable" ( _readtimes, { __mode = "k" } ) -use "setmetatable" ( _writetimes, { __mode = "k" } ) - -_timer = os_time( ) -_starttime = os_time( ) - -addtimer( function( ) - local difftime = os_difftime( _currenttime - _starttime ) - if difftime > _checkinterval then - _starttime = _currenttime - for handler, timestamp in pairs( _writetimes ) do - if os_difftime( _currenttime - timestamp ) > _sendtimeout then - --_writetimes[ handler ] = nil - handler.disconnect( )( handler, "send timeout" ) - handler.close( true ) -- forced disconnect - end - end - for handler, timestamp in pairs( _readtimes ) do - if os_difftime( _currenttime - timestamp ) > _readtimeout then - --_readtimes[ handler ] = nil - handler.disconnect( )( handler, "read timeout" ) - handler.close( ) -- forced disconnect? - end - end - end - end -) - -----------------------------------// PUBLIC INTERFACE //-- - -return { - - addclient = addclient, - wrapclient = wrapclient, - - loop = loop, - stats = stats, - closeall = closeall, - addtimer = addtimer, - addserver = addserver, - getserver = getserver, - getsettings = getsettings, - setquitting = setquitting, - removeserver = removeserver, - changesettings = changesettings, -} + +local use_luaevent = require "core.configmanager".get("*", "core", "use_libevent"); + +if use_luaevent then + use_luaevent = pcall(require, "luaevent.core"); + if not use_luaevent then + log("error", "libevent not found, falling back to select()"); + end +end + +local server; + +if use_luaevent then + server = require "net.server_event"; + -- util.timer requires "net.server", so instead of having + -- Lua look for, and load us again (causing a loop) - set this here + -- (usually it isn't set until we return, look down there...) + package.loaded["net.server"] = server; + + -- Backwards compatibility for timers, addtimer + -- called a function roughly every second + local add_task = require "util.timer".add_task; + function server.addtimer(f) + return add_task(1, function (...) f(...); return 1; end); + end +else + server = require "net.server_select"; + package.loaded["net.server"] = server; +end + +-- require "net.server" shall now forever return this, +-- ie. server_select or server_event as chosen above. +return server; diff -r 7cb6460b18d8 -r 141896297cea net/server_event.lua --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/net/server_event.lua Wed Nov 25 17:39:23 2009 +0000 @@ -0,0 +1,764 @@ +--[[ + + + server.lua based on lua/libevent by blastbeat + + notes: + -- when using luaevent, never register 2 or more EV_READ at one socket, same for EV_WRITE + -- you cant even register a new EV_READ/EV_WRITE callback inside another one + -- never call eventcallback:close( ) from inside eventcallback + -- to do some of the above, use timeout events or something what will called from outside + -- dont let garbagecollect eventcallbacks, as long they are running + -- when using luasec, there are 4 cases of timeout errors: wantread or wantwrite during reading or writing + +--]] + + +local SCRIPT_NAME = "server_event.lua" +local SCRIPT_VERSION = "0.05" +local SCRIPT_AUTHOR = "blastbeat" +local LAST_MODIFIED = "2009/11/20" + +local cfg = { + MAX_CONNECTIONS = 100000, -- max per server connections (use "ulimit -n" on *nix) + MAX_HANDSHAKE_ATTEMPS = 10, -- attemps to finish ssl handshake + HANDSHAKE_TIMEOUT = 1, -- timout in seconds per handshake attemp + MAX_READ_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes allowed to read from sockets + MAX_SEND_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes size of write buffer (for writing on sockets) + ACCEPT_DELAY = 10, -- seconds to wait until the next attemp of a full server to accept + READ_TIMEOUT = 60 * 30, -- timeout in seconds for read data from socket + WRITE_TIMEOUT = 30, -- timeout in seconds for write data on socket + CONNECT_TIMEOUT = 10, -- timeout in seconds for connection attemps + CLEAR_DELAY = 5, -- seconds to wait for clearing interface list (and calling ondisconnect listeners) + DEBUG = true, -- show debug messages +} + +local function use(x) return rawget(_G, x); end +local print = use "print" +local pcall = use "pcall" +local ipairs = use "ipairs" +local string = use "string" +local select = use "select" +local require = use "require" +local tostring = use "tostring" +local coroutine = use "coroutine" +local setmetatable = use "setmetatable" + +local ssl = use "ssl" +local socket = use "socket" + +local log = require ("util.logger").init("socket") + +local function debug(...) + return log("debug", ("%s "):rep(select('#', ...)), ...) +end +local vdebug = debug; + +local bitor = ( function( ) -- thx Rici Lake + local hasbit = function( x, p ) + return x % ( p + p ) >= p + end + return function( x, y ) + local p = 1 + local z = 0 + local limit = x > y and x or y + while p <= limit do + if hasbit( x, p ) or hasbit( y, p ) then + z = z + p + end + p = p + p + end + return z + end +end )( ) + +local event = require "luaevent.core" +local base = event.new( ) +local EV_READ = event.EV_READ +local EV_WRITE = event.EV_WRITE +local EV_TIMEOUT = event.EV_TIMEOUT + +local EV_READWRITE = bitor( EV_READ, EV_WRITE ) + +local interfacelist = ( function( ) -- holds the interfaces for sockets + local array = { } + local len = 0 + return function( method, arg ) + if "add" == method then + len = len + 1 + array[ len ] = arg + arg:_position( len ) + return len + elseif "delete" == method then + if len <= 0 then + return nil, "array is already empty" + end + local position = arg:_position() -- get position in array + if position ~= len then + local interface = array[ len ] -- get last interface + array[ position ] = interface -- copy it into free position + array[ len ] = nil -- free last position + interface:_position( position ) -- set new position in array + else -- free last position + array[ len ] = nil + end + len = len - 1 + return len + else + return array + end + end +end )( ) + +-- Client interface methods +local interface_mt +do + interface_mt = {}; interface_mt.__index = interface_mt; + + local addevent = base.addevent + local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield + local string_len = string.len + + -- Private methods + function interface_mt:_position(new_position) + self.position = new_position or self.position + return self.position; + end + function interface_mt:_close() -- regs event to start self:_destroy() + local callback = function( ) + self:_destroy(); + self.eventclose = nil + return -1 + end + self.eventclose = addevent( base, nil, EV_TIMEOUT, callback, 0 ) + return true + end + + function interface_mt:_start_connection(plainssl) -- should be called from addclient + local callback = function( event ) + if EV_TIMEOUT == event then -- timout during connection + self.fatalerror = "connection timeout" + self.listener.ontimeout( self ) -- call timeout listener + self:_close() + debug( "new connection failed. id:", self.id, "error:", self.fatalerror ) + else + if plainssl then -- start ssl session + self:_start_ssl( self.listener.onconnect ) + else -- normal connection + self:_start_session( self.listener.onconnect ) + end + debug( "new connection established. id:", self.id ) + end + self.eventconnect = nil + return -1 + end + self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT ) + return true + end + function interface_mt:_start_session(onconnect) -- new session, for example after startssl + if self.type == "client" then + local callback = function( ) + self:_lock( false, false, false ) + --vdebug( "start listening on client socket with id:", self.id ) + self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ) -- register callback + self:onconnect() + self.eventsession = nil + return -1 + end + self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 ) + else + self:_lock( false ) + --vdebug( "start listening on server socket with id:", self.id ) + self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback + end + return true + end + function interface_mt:_start_ssl(arg) -- old socket will be destroyed, therefore we have to close read/write events first + --vdebug( "starting ssl session with client id:", self.id ) + local _ + _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! + _ = self.eventwrite and self.eventwrite:close( ) + self.eventread, self.eventwrite = nil, nil + local err + self.conn, err = ssl.wrap( self.conn, self._sslctx ) + if err then + self.fatalerror = err + self.conn = nil -- cannot be used anymore + if "onconnect" == arg then + self.ondisconnect = nil -- dont call this when client isnt really connected + end + self:_close() + debug( "fatal error while ssl wrapping:", err ) + return false + end + self.conn:settimeout( 0 ) -- set non blocking + local handshakecallback = coroutine_wrap( + function( event ) + local _, err + local attempt = 0 + local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPS + while attempt < 1000 do -- no endless loop + attempt = attempt + 1 + debug( "ssl handshake of client with id:"..tostring(self).."attemp:"..attempt ) + if attempt > maxattempt then + self.fatalerror = "max handshake attemps exceeded" + elseif EV_TIMEOUT == event then + self.fatalerror = "timeout during handshake" + else + _, err = self.conn:dohandshake( ) + if not err then + self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed + self.send = self.conn.send -- caching table lookups with new client object + self.receive = self.conn.receive + local onsomething + if "onconnect" == arg then -- trigger listener + onsomething = self.onconnect + else + onsomething = self.onsslconnection + end + self:_start_session( onsomething ) + debug( "ssl handshake done" ) + self.eventhandshake = nil + return -1 + end + debug( "error during ssl handshake:", err ) + if err == "wantwrite" then + event = EV_WRITE + elseif err == "wantread" then + event = EV_READ + else + self.fatalerror = err + end + end + if self.fatalerror then + if "onconnect" == arg then + self.ondisconnect = nil -- dont call this when client isnt really connected + end + self:_close() + debug( "handshake failed because:", self.fatalerror ) + self.eventhandshake = nil + return -1 + end + event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster... + end + end + ) + debug "starting handshake..." + self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked + self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT ) + return true + end + function interface_mt:_destroy() -- close this interface + events and call last listener + debug( "closing client with id:", self.id ) + self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions + local _ + _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! + if self.type == "client" then + _ = self.eventwrite and self.eventwrite:close( ) + _ = self.eventhandshake and self.eventhandshake:close( ) + _ = self.eventstarthandshake and self.eventstarthandshake:close( ) + _ = self.eventconnect and self.eventconnect:close( ) + _ = self.eventsession and self.eventsession:close( ) + _ = self.eventwritetimeout and self.eventwritetimeout:close( ) + _ = self.eventreadtimeout and self.eventreadtimeout:close( ) + _ = self.ondisconnect and self:ondisconnect( self.fatalerror ) -- call ondisconnect listener (wont be the case if handshake failed on connect) + _ = self.conn and self.conn:close( ) -- close connection, must also be called outside of any socket registered events! + self._server:counter(-1); + self.eventread, self.eventwrite = nil, nil + self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil + self.readcallback, self.writecallback = nil, nil + else + self.conn:close( ) + self.eventread, self.eventclose = nil, nil + self.interface, self.readcallback = nil, nil + end + interfacelist( "delete", self ) + return true + end + + function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events + self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting + return nointerface, noreading, nowriting + end + + function interface_mt:counter(c) + if c then + self._connections = self._connections - c + end + return self._connections + end + + -- Public methods + function interface_mt:write(data) + if self.nowriting then return nil, "locked" end + --vdebug( "try to send data to client, id/data:", self.id, data ) + data = tostring( data ) + local len = string_len( data ) + local total = len + self.writebufferlen + if total > cfg.MAX_SEND_LENGTH then -- check buffer length + local err = "send buffer exceeded" + debug( "error:", err ) -- to much, check your app + return nil, err + end + self.writebuffer = self.writebuffer .. data -- new buffer + self.writebufferlen = total + if not self.eventwrite then -- register new write event + --vdebug( "register new write event" ) + self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ) + end + return true + end + function interface_mt:close(now) + if self.nointerface then return nil, "locked"; end + debug( "try to close client connection with id:", self.id ) + if self.type == "client" then + self.fatalerror = "client to close" + if ( not self.eventwrite ) or now then -- try to close immediately + self:_lock( true, true, true ) + self:_close() + return true + else -- wait for incomplete write request + self:_lock( true, true, false ) + debug "closing delayed until writebuffer is empty" + return nil, "writebuffer not empty, waiting" + end + else + debug( "try to close server with id:", self.id, "args:", now ) + self.fatalerror = "server to close" + self:_lock( true ) + local count = 0 + for _, item in ipairs( interfacelist( ) ) do + if ( item.type ~= "server" ) and ( item._server == self ) then -- client/server match + if item:close( now ) then -- writebuffer was empty + count = count + 1 + end + end + end + local timeout = 0 -- dont wait for unfinished writebuffers of clients... + if not now then + timeout = cfg.WRITE_TIMEOUT -- ...or wait for it + end + self:_close( timeout ) -- add new event to remove the server interface + debug( "seconds remained until server is closed:", timeout ) + return count -- returns finished clients with empty writebuffer + end + end + + function interface_mt:server() + return self._server or self; + end + + function interface_mt:port() + return self._port + end + + function interface_mt:ip() + return self._ip + end + + function interface_mt:ssl() + return self._usingssl + end + + function interface_mt:type() + return self._type or "client" + end + + function interface_mt:connections() + return self._connections + end + + function interface_mt:address() + return self.addr + end + + function interface_mt:set_sslctx(sslctx) + self._sslctx = sslctx; + end + + function interface_mt:starttls() + debug( "try to start ssl at client id:", self.id ) + local err + if not self._sslctx then -- no ssl available + err = "no ssl context available" + elseif self._usingssl then -- startssl was already called + err = "ssl already active" + end + if err then + debug( "error:", err ) + return nil, err + end + self._usingssl = true + self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event + self.startsslcallback = nil + self:_start_ssl(); + self.eventstarthandshake = nil + return -1 + end + if not self.eventwrite then + self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake + self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake + else -- wait until writebuffer is empty + self:_lock( true, true, false ) + debug "ssl session delayed until writebuffer is empty..." + end + return true + end + + function interface_mt.onconnect() + end +end + +-- End of client interface methods + +local handleclient; +do + local string_sub = string.sub -- caching table lookups + local string_len = string.len + local addevent = base.addevent + local coroutine_wrap = coroutine.wrap + local socket_gettime = socket.gettime + local coroutine_yield = coroutine.yield + function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface + --vdebug("creating client interfacce...") + local interface = { + type = "client"; + conn = client; + currenttime = socket_gettime( ); -- safe the origin + writebuffer = ""; -- writebuffer + writebufferlen = 0; -- length of writebuffer + send = client.send; -- caching table lookups + receive = client.receive; + onconnect = listener.onconnect; -- will be called when client disconnects + ondisconnect = listener.ondisconnect; -- will be called when client disconnects + onincoming = listener.onincoming; -- will be called when client sends data + eventread = false, eventwrite = false, eventclose = false, + eventhandshake = false, eventstarthandshake = false; -- event handler + eventconnect = false, eventsession = false; -- more event handler... + eventwritetimeout = false; -- even more event handler... + eventreadtimeout = false; + fatalerror = false; -- error message + writecallback = false; -- will be called on write events + readcallback = false; -- will be called on read events + nointerface = true; -- lock/unlock parameter of this interface + noreading = false, nowriting = false; -- locks of the read/writecallback + startsslcallback = false; -- starting handshake callback + position = false; -- position of client in interfacelist + + -- Properties + _ip = ip, _port = port, _server = server, _pattern = pattern, + _sslctx = sslctx; -- parameters + _usingssl = false; -- client is using ssl; + } + interface.id = tostring(interface):match("%x+$"); + interface.writecallback = function( event ) -- called on write events + --vdebug( "new client write event, id/ip/port:", interface, ip, port ) + if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event + --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror ) + interface.eventwrite = false + return -1 + end + if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect + interface.fatalerror = "timeout during writing" + debug( "writing failed:", interface.fatalerror ) + interface:_close() + interface.eventwrite = false + return -1 + else -- can write :) + if interface._usingssl then -- handle luasec + if interface.eventreadtimeout then -- we have to read first + local ret = interface.readcallback( ) -- call readcallback + --vdebug( "tried to read in writecallback, result:", ret ) + end + if interface.eventwritetimeout then -- luasec only + interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error + interface.eventwritetimeout = false + end + end + local succ, err, byte = interface.conn:send( interface.writebuffer, 1, interface.writebufferlen ) + --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte ) + if succ then -- writing succesful + interface.writebuffer = "" + interface.writebufferlen = 0 + if interface.fatalerror then + debug "closing client after writing" + interface:_close() -- close interface if needed + elseif interface.startsslcallback then -- start ssl connection if needed + debug "starting ssl handshake after writing" + interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 ) + elseif interface.eventreadtimeout then + return EV_WRITE, EV_TIMEOUT + end + interface.eventwrite = nil + return -1 + elseif byte then -- want write again + --vdebug( "writebuffer is not empty:", err ) + interface.writebuffer = string_sub( interface.writebuffer, byte + 1, interface.writebufferlen ) -- new buffer + interface.writebufferlen = interface.writebufferlen - byte + if "wantread" == err then -- happens only with luasec + local callback = function( ) + interface:_close() + interface.eventwritetimeout = nil + return evreturn, evtimeout + end + interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event + debug( "wantread during write attemp, reg it in readcallback but dont know what really happens next..." ) + -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent + return -1 + end + return EV_WRITE, cfg.WRITE_TIMEOUT + else -- connection was closed during writing or fatal error + interface.fatalerror = err or "fatal error" + debug( "connection failed in write event:", interface.fatalerror ) + interface:_close() + interface.eventwrite = nil + return -1 + end + end + end + + interface.readcallback = function( event ) -- called on read events + --vdebug( "new client read event, id/ip/port:", interface, ip, port ) + if interface.noreading or interface.fatalerror then -- leave this event + --vdebug( "leaving this event because:", interface.noreading or interface.fatalerror ) + interface.eventread = nil + return -1 + end + if EV_TIMEOUT == event then -- took too long to get some data from client -> disconnect + interface.fatalerror = "timeout during receiving" + debug( "connection failed:", interface.fatalerror ) + interface:_close() + interface.eventread = nil + return -1 + else -- can read + if interface._usingssl then -- handle luasec + if interface.eventwritetimeout then -- ok, in the past writecallback was regged + local ret = interface.writecallback( ) -- call it + --vdebug( "tried to write in readcallback, result:", ret ) + end + if interface.eventreadtimeout then + interface.eventreadtimeout:close( ) + interface.eventreadtimeout = nil + end + end + local buffer, err, part = interface.conn:receive( pattern ) -- receive buffer with "pattern" + --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) + buffer = buffer or part or "" + local len = string_len( buffer ) + if len > cfg.MAX_READ_LENGTH then -- check buffer length + interface.fatalerror = "receive buffer exceeded" + debug( "fatal error:", interface.fatalerror ) + interface:_close() + interface.eventread = nil + return -1 + end + if err and ( err ~= "timeout" and err ~= "wantread" ) then + if "wantwrite" == err then -- need to read on write event + if not interface.eventwrite then -- register new write event if needed + interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) + end + interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, + function( ) + interface:_close() + end, cfg.READ_TIMEOUT + ) + debug( "wantwrite during read attemp, reg it in writecallback but dont know what really happens next..." ) + -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... + else -- connection was closed or fatal error + interface.fatalerror = err + debug( "connection failed in read event:", interface.fatalerror ) + interface:_close() + interface.eventread = nil + return -1 + end + end + interface.onincoming( interface, buffer, err ) -- send new data to listener + return EV_READ, cfg.READ_TIMEOUT + end + end + + client:settimeout( 0 ) -- set non blocking + setmetatable(interface, interface_mt) + interfacelist( "add", interface ) -- add to interfacelist + return interface + end +end + +local handleserver +do + function handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- creates an server interface + debug "creating server interface..." + local interface = { + _connections = 0; + + conn = server; + onconnect = listener.onconnect; -- will be called when new client connected + eventread = false; -- read event handler + eventclose = false; -- close event handler + readcallback = false; -- read event callback + fatalerror = false; -- error message + nointerface = true; -- lock/unlock parameter + } + interface.id = tostring(interface):match("%x+$"); + interface.readcallback = function( event ) -- server handler, called on incoming connections + --vdebug( "server can accept, id/addr/port:", interface, addr, port ) + if interface.fatalerror then + --vdebug( "leaving this event because:", self.fatalerror ) + interface.eventread = nil + return -1 + end + local delay = cfg.ACCEPT_DELAY + if EV_TIMEOUT == event then + if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count + debug( "to many connections, seconds to wait for next accept:", delay ) + return EV_TIMEOUT, delay -- timeout... + else + return EV_READ -- accept again + end + end + --vdebug("max connection check ok, accepting...") + local client, err = server:accept() -- try to accept; TODO: check err + while client do + if interface._connections >= cfg.MAX_CONNECTIONS then + client:close( ) -- refuse connection + debug( "maximal connections reached, refuse client connection; accept delay:", delay ) + return EV_TIMEOUT, delay -- delay for next accept attemp + end + local ip, port = client:getpeername( ) + interface._connections = interface._connections + 1 -- increase connection count + local clientinterface = handleclient( client, ip, port, interface, pattern, listener, nil, sslctx ) + --vdebug( "client id:", clientinterface, "startssl:", startssl ) + if startssl then + clientinterface:_start_ssl( clientinterface.onconnect ) + else + clientinterface:_start_session( clientinterface.onconnect ) + end + debug( "accepted incoming client connection from:", ip, port ) + client, err = server:accept() -- try to accept again + end + return EV_READ + end + + server:settimeout( 0 ) + setmetatable(interface, interface_mt) + interfacelist( "add", interface ) + interface:_start_session() + return interface + end +end + +local addserver = ( function( ) + return function( addr, port, listener, pattern, sslcfg, startssl ) -- TODO: check arguments + --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslcfg or "nil", startssl or "nil") + local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket + if not server then + debug( "creating server socket failed because:", err ) + return nil, err + end + local sslctx + if sslcfg then + if not ssl then + debug "fatal error: luasec not found" + return nil, "luasec not found" + end + sslctx, err = ssl.newcontext( sslcfg ) + if err then + debug( "error while creating new ssl context for server socket:", err ) + return nil, err + end + end + local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler + debug( "new server created with id:", tostring(interface)) + return interface + end +end )( ) + +local wrapclient = ( function( ) + return function( client, addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl ) + debug( "try to connect to:", addr, serverport, "with parameters:", pattern, localaddr, localport, sslcfg, startssl ) + local sslctx + if sslcfg then -- handle ssl/new context + if not ssl then + debug "need luasec, but not available" + return nil, "luasec not found" + end + sslctx, err = ssl.newcontext( sslcfg ) + if err then + debug( "cannot create new ssl context:", err ) + return nil, err + end + end + end +end )( ) + +local addclient = ( function( ) + return function( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl ) + local client, err = socket.tcp() -- creating new socket + if not client then + debug( "cannot create socket:", err ) + return nil, err + end + client:settimeout( 0 ) -- set nonblocking + if localaddr then + local res, err = client:bind( localaddr, localport, -1 ) + if not res then + debug( "cannot bind client:", err ) + return nil, err + end + end + local res, err = client:connect( addr, serverport ) -- connect + if res or ( err == "timeout" ) then + local ip, port = client:getsockname( ) + local server = function( ) + return nil, "this is a dummy server interface" + end + local interface = handleclient( client, ip, port, server, pattern, listener, sslctx ) + interface:_start_connection( startssl ) + debug( "new connection id:", interface ) + return interface, err + else + debug( "new connection failed:", err ) + return nil, err + end + return wrapclient( client, addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl ) + end +end )( ) + +local loop = function( ) -- starts the event loop + return base:loop( ) +end + +local newevent = ( function( ) + local add = base.addevent + return function( ... ) + return add( base, ... ) + end +end )( ) + +local closeallservers = function( arg ) + for _, item in ipairs( interfacelist( ) ) do + if item "type" == "server" then + item( "close", arg ) + end + end +end + +return { + + cfg = cfg, + base = base, + loop = loop, + event = event, + event_base = base, + addevent = newevent, + addserver = addserver, + addclient = addclient, + wrapclient = wrapclient, + closeallservers = closeallservers, + + __NAME = SCRIPT_NAME, + __DATE = LAST_MODIFIED, + __AUTHOR = SCRIPT_AUTHOR, + __VERSION = SCRIPT_VERSION, + +} diff -r 7cb6460b18d8 -r 141896297cea net/server_select.lua --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/net/server_select.lua Wed Nov 25 17:39:23 2009 +0000 @@ -0,0 +1,914 @@ +-- +-- server.lua by blastbeat of the luadch project +-- Re-used here under the MIT/X Consortium License +-- +-- Modifications (C) 2008-2009 Matthew Wild, Waqas Hussain +-- + +-- // wrapping luadch stuff // -- + +local use = function( what ) + return _G[ what ] +end +local clean = function( tbl ) + for i, k in pairs( tbl ) do + tbl[ i ] = nil + end +end + +local log, table_concat = require ("util.logger").init("socket"), table.concat; +local out_put = function (...) return log("debug", table_concat{...}); end +local out_error = function (...) return log("warn", table_concat{...}); end +local mem_free = collectgarbage + +----------------------------------// DECLARATION //-- + +--// constants //-- + +local STAT_UNIT = 1 -- byte + +--// lua functions //-- + +local type = use "type" +local pairs = use "pairs" +local ipairs = use "ipairs" +local tostring = use "tostring" +local collectgarbage = use "collectgarbage" + +--// lua libs //-- + +local os = use "os" +local table = use "table" +local string = use "string" +local coroutine = use "coroutine" + +--// lua lib methods //-- + +local os_time = os.time +local os_difftime = os.difftime +local table_concat = table.concat +local table_remove = table.remove +local string_len = string.len +local string_sub = string.sub +local coroutine_wrap = coroutine.wrap +local coroutine_yield = coroutine.yield + +--// extern libs //-- + +local luasec = select( 2, pcall( require, "ssl" ) ) +local luasocket = require "socket" + +--// extern lib methods //-- + +local ssl_wrap = ( luasec and luasec.wrap ) +local socket_bind = luasocket.bind +local socket_sleep = luasocket.sleep +local socket_select = luasocket.select +local ssl_newcontext = ( luasec and luasec.newcontext ) + +--// functions //-- + +local id +local loop +local stats +local idfalse +local addtimer +local closeall +local addserver +local getserver +local wrapserver +local getsettings +local closesocket +local removesocket +local removeserver +local changetimeout +local wrapconnection +local changesettings + +--// tables //-- + +local _server +local _readlist +local _timerlist +local _sendlist +local _socketlist +local _closelist +local _readtimes +local _writetimes + +--// simple data types //-- + +local _ +local _readlistlen +local _sendlistlen +local _timerlistlen + +local _sendtraffic +local _readtraffic + +local _selecttimeout +local _sleeptime + +local _starttime +local _currenttime + +local _maxsendlen +local _maxreadlen + +local _checkinterval +local _sendtimeout +local _readtimeout + +local _cleanqueue + +local _timer + +local _maxclientsperserver + +----------------------------------// DEFINITION //-- + +_server = { } -- key = port, value = table; list of listening servers +_readlist = { } -- array with sockets to read from +_sendlist = { } -- arrary with sockets to write to +_timerlist = { } -- array of timer functions +_socketlist = { } -- key = socket, value = wrapped socket (handlers) +_readtimes = { } -- key = handler, value = timestamp of last data reading +_writetimes = { } -- key = handler, value = timestamp of last data writing/sending +_closelist = { } -- handlers to close + +_readlistlen = 0 -- length of readlist +_sendlistlen = 0 -- length of sendlist +_timerlistlen = 0 -- lenght of timerlist + +_sendtraffic = 0 -- some stats +_readtraffic = 0 + +_selecttimeout = 1 -- timeout of socket.select +_sleeptime = 0 -- time to wait at the end of every loop + +_maxsendlen = 51000 * 1024 -- max len of send buffer +_maxreadlen = 25000 * 1024 -- max len of read buffer + +_checkinterval = 1200000 -- interval in secs to check idle clients +_sendtimeout = 60000 -- allowed send idle time in secs +_readtimeout = 6 * 60 * 60 -- allowed read idle time in secs + +_cleanqueue = false -- clean bufferqueue after using + +_maxclientsperserver = 1000 + +_maxsslhandshake = 30 -- max handshake round-trips +----------------------------------// PRIVATE //-- + +wrapserver = function( listeners, socket, ip, serverport, pattern, sslctx, maxconnections, startssl ) -- this function wraps a server + + maxconnections = maxconnections or _maxclientsperserver + + local connections = 0 + + local dispatch, disconnect = listeners.onincoming, listeners.ondisconnect + + local err + + local ssl = false + + if sslctx then + ssl = true + if not ssl_newcontext then + out_error "luasec not found" + ssl = false + end + if type( sslctx ) ~= "table" then + out_error "server.lua: wrong server sslctx" + ssl = false + end + local ctx; + ctx, err = ssl_newcontext( sslctx ) + if not ctx then + err = err or "wrong sslctx parameters" + local file; + file = err:match("^error loading (.-) %("); + if file then + if file == "private key" then + file = sslctx.key or "your private key"; + elseif file == "certificate" then + file = sslctx.certificate or "your certificate file"; + end + local reason = err:match("%((.+)%)$") or "some reason"; + if reason == "Permission denied" then + reason = "Check that the permissions allow Prosody to read this file."; + elseif reason == "No such file or directory" then + reason = "Check that the path is correct, and the file exists."; + elseif reason == "system lib" then + reason = "Previous error (see logs), or other system error."; + else + reason = "Reason: "..tostring(reason or "unknown"):lower(); + end + log("error", "SSL/TLS: Failed to load %s: %s", file, reason); + else + log("error", "SSL/TLS: Error initialising for port %d: %s", serverport, err ); + end + ssl = false + end + sslctx = ctx; + end + if not ssl then + sslctx = false; + if startssl then + log("error", "Failed to listen on port %d due to SSL/TLS to SSL/TLS initialisation errors (see logs)", serverport ) + return nil, "Cannot start ssl, see log for details" + end + end + + local accept = socket.accept + + --// public methods of the object //-- + + local handler = { } + + handler.shutdown = function( ) end + + handler.ssl = function( ) + return ssl + end + handler.sslctx = function( ) + return sslctx + end + handler.remove = function( ) + connections = connections - 1 + end + handler.close = function( ) + for _, handler in pairs( _socketlist ) do + if handler.serverport == serverport then + handler.disconnect( handler, "server closed" ) + handler:close( true ) + end + end + socket:close( ) + _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) + _readlistlen = removesocket( _readlist, socket, _readlistlen ) + _socketlist[ socket ] = nil + handler = nil + socket = nil + --mem_free( ) + out_put "server.lua: closed server handler and removed sockets from list" + end + handler.ip = function( ) + return ip + end + handler.serverport = function( ) + return serverport + end + handler.socket = function( ) + return socket + end + handler.readbuffer = function( ) + if connections > maxconnections then + out_put( "server.lua: refused new client connection: server full" ) + return false + end + local client, err = accept( socket ) -- try to accept + if client then + local ip, clientport = client:getpeername( ) + client:settimeout( 0 ) + local handler, client, err = wrapconnection( handler, listeners, client, ip, serverport, clientport, pattern, sslctx, startssl ) -- wrap new client socket + if err then -- error while wrapping ssl socket + return false + end + connections = connections + 1 + out_put( "server.lua: accepted new client connection from ", tostring(ip), ":", tostring(clientport), " to ", tostring(serverport)) + return dispatch( handler ) + elseif err then -- maybe timeout or something else + out_put( "server.lua: error with new client connection: ", tostring(err) ) + return false + end + end + return handler +end + +wrapconnection = function( server, listeners, socket, ip, serverport, clientport, pattern, sslctx, startssl ) -- this function wraps a client to a handler object + + socket:settimeout( 0 ) + + --// local import of socket methods //-- + + local send + local receive + local shutdown + + --// private closures of the object //-- + + local ssl + + local dispatch = listeners.onincoming + local status = listeners.status + local disconnect = listeners.ondisconnect + + local bufferqueue = { } -- buffer array + local bufferqueuelen = 0 -- end of buffer array + + local toclose + local fatalerror + local needtls + + local bufferlen = 0 + + local noread = false + local nosend = false + + local sendtraffic, readtraffic = 0, 0 + + local maxsendlen = _maxsendlen + local maxreadlen = _maxreadlen + + --// public methods of the object //-- + + local handler = bufferqueue -- saves a table ^_^ + + handler.dispatch = function( ) + return dispatch + end + handler.disconnect = function( ) + return disconnect + end + handler.setlistener = function( self, listeners ) + dispatch = listeners.onincoming + disconnect = listeners.ondisconnect + end + handler.getstats = function( ) + return readtraffic, sendtraffic + end + handler.ssl = function( ) + return ssl + end + handler.sslctx = function ( ) + return sslctx + end + handler.send = function( _, data, i, j ) + return send( socket, data, i, j ) + end + handler.receive = function( pattern, prefix ) + return receive( socket, pattern, prefix ) + end + handler.shutdown = function( pattern ) + return shutdown( socket, pattern ) + end + handler.close = function( forced ) + if not handler then return true; end + _readlistlen = removesocket( _readlist, socket, _readlistlen ) + _readtimes[ handler ] = nil + if bufferqueuelen ~= 0 then + if not ( forced or fatalerror ) then + handler.sendbuffer( ) + if bufferqueuelen ~= 0 then -- try again... + if handler then + handler.write = nil -- ... but no further writing allowed + end + toclose = true + return false + end + else + send( socket, table_concat( bufferqueue, "", 1, bufferqueuelen ), 1, bufferlen ) -- forced send + end + end + if socket then + _ = shutdown and shutdown( socket ) + socket:close( ) + _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) + _socketlist[ socket ] = nil + socket = nil + else + out_put "server.lua: socket already closed" + end + if handler then + _writetimes[ handler ] = nil + _closelist[ handler ] = nil + handler = nil + end + if server then + server.remove( ) + end + out_put "server.lua: closed client handler and removed socket from list" + return true + end + handler.ip = function( ) + return ip + end + handler.serverport = function( ) + return serverport + end + handler.clientport = function( ) + return clientport + end + local write = function( self, data ) + bufferlen = bufferlen + string_len( data ) + if bufferlen > maxsendlen then + _closelist[ handler ] = "send buffer exceeded" -- cannot close the client at the moment, have to wait to the end of the cycle + handler.write = idfalse -- dont write anymore + return false + elseif socket and not _sendlist[ socket ] then + _sendlistlen = addsocket(_sendlist, socket, _sendlistlen) + end + bufferqueuelen = bufferqueuelen + 1 + bufferqueue[ bufferqueuelen ] = data + if handler then + _writetimes[ handler ] = _writetimes[ handler ] or _currenttime + end + return true + end + handler.write = write + handler.bufferqueue = function( self ) + return bufferqueue + end + handler.socket = function( self ) + return socket + end + handler.pattern = function( self, new ) + pattern = new or pattern + return pattern + end + handler.setsend = function ( self, newsend ) + send = newsend or send + return send + end + handler.bufferlen = function( self, readlen, sendlen ) + maxsendlen = sendlen or maxsendlen + maxreadlen = readlen or maxreadlen + return maxreadlen, maxsendlen + end + handler.lock = function( self, switch ) + if switch == true then + handler.write = idfalse + local tmp = _sendlistlen + _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) + _writetimes[ handler ] = nil + if _sendlistlen ~= tmp then + nosend = true + end + tmp = _readlistlen + _readlistlen = removesocket( _readlist, socket, _readlistlen ) + _readtimes[ handler ] = nil + if _readlistlen ~= tmp then + noread = true + end + elseif switch == false then + handler.write = write + if noread then + noread = false + _readlistlen = addsocket(_readlist, socket, _readlistlen) + _readtimes[ handler ] = _currenttime + end + if nosend then + nosend = false + write( "" ) + end + end + return noread, nosend + end + local _readbuffer = function( ) -- this function reads data + local buffer, err, part = receive( socket, pattern ) -- receive buffer with "pattern" + if not err or ( err == "timeout" or err == "wantread" ) then -- received something + local buffer = buffer or part or "" + local len = string_len( buffer ) + if len > maxreadlen then + disconnect( handler, "receive buffer exceeded" ) + handler.close( true ) + return false + end + local count = len * STAT_UNIT + readtraffic = readtraffic + count + _readtraffic = _readtraffic + count + _readtimes[ handler ] = _currenttime + --out_put( "server.lua: read data '", buffer:gsub("[^%w%p ]", "."), "', error: ", err ) + return dispatch( handler, buffer, err ) + else -- connections was closed or fatal error + out_put( "server.lua: client ", tostring(ip), ":", tostring(clientport), " read error: ", tostring(err) ) + fatalerror = true + disconnect( handler, err ) + _ = handler and handler.close( ) + return false + end + end + local _sendbuffer = function( ) -- this function sends data + local succ, err, byte, buffer, count; + local count; + if socket then + buffer = table_concat( bufferqueue, "", 1, bufferqueuelen ) + succ, err, byte = send( socket, buffer, 1, bufferlen ) + count = ( succ or byte or 0 ) * STAT_UNIT + sendtraffic = sendtraffic + count + _sendtraffic = _sendtraffic + count + _ = _cleanqueue and clean( bufferqueue ) + --out_put( "server.lua: sended '", buffer, "', bytes: ", tostring(succ), ", error: ", tostring(err), ", part: ", tostring(byte), ", to: ", tostring(ip), ":", tostring(clientport) ) + else + succ, err, count = false, "closed", 0; + end + if succ then -- sending succesful + bufferqueuelen = 0 + bufferlen = 0 + _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) -- delete socket from writelist + _ = needtls and handler:starttls(true) + _writetimes[ handler ] = nil + _ = toclose and handler.close( ) + return true + elseif byte and ( err == "timeout" or err == "wantwrite" ) then -- want write + buffer = string_sub( buffer, byte + 1, bufferlen ) -- new buffer + bufferqueue[ 1 ] = buffer -- insert new buffer in queue + bufferqueuelen = 1 + bufferlen = bufferlen - byte + _writetimes[ handler ] = _currenttime + return true + else -- connection was closed during sending or fatal error + out_put( "server.lua: client ", tostring(ip), ":", tostring(clientport), " write error: ", tostring(err) ) + fatalerror = true + disconnect( handler, err ) + _ = handler and handler.close( ) + return false + end + end + + -- Set the sslctx + local handshake; + function handler.set_sslctx(self, new_sslctx) + ssl = true + sslctx = new_sslctx; + local wrote + local read + handshake = coroutine_wrap( function( client ) -- create handshake coroutine + local err + for i = 1, _maxsslhandshake do + _sendlistlen = ( wrote and removesocket( _sendlist, client, _sendlistlen ) ) or _sendlistlen + _readlistlen = ( read and removesocket( _readlist, client, _readlistlen ) ) or _readlistlen + read, wrote = nil, nil + _, err = client:dohandshake( ) + if not err then + out_put( "server.lua: ssl handshake done" ) + handler.readbuffer = _readbuffer -- when handshake is done, replace the handshake function with regular functions + handler.sendbuffer = _sendbuffer + _ = status and status( handler, "ssl-handshake-complete" ) + _readlistlen = addsocket(_readlist, client, _readlistlen) + return true + else + out_put( "server.lua: error during ssl handshake: ", tostring(err) ) + if err == "wantwrite" and not wrote then + _sendlistlen = addsocket(_sendlist, client, _sendlistlen) + wrote = true + elseif err == "wantread" and not read then + _readlistlen = addsocket(_readlist, client, _readlistlen) + read = true + else + break; + end + --coroutine_yield( handler, nil, err ) -- handshake not finished + coroutine_yield( ) + end + end + disconnect( handler, "ssl handshake failed" ) + _ = handler and handler:close( true ) -- forced disconnect + return false -- handshake failed + end + ) + end + if sslctx then -- ssl? + handler:set_sslctx(sslctx); + if startssl then -- ssl now? + --out_put("server.lua: ", "starting ssl handshake") + local err + socket, err = ssl_wrap( socket, sslctx ) -- wrap socket + if err then + out_put( "server.lua: ssl error: ", tostring(err) ) + --mem_free( ) + return nil, nil, err -- fatal error + end + socket:settimeout( 0 ) + handler.readbuffer = handshake + handler.sendbuffer = handshake + handshake( socket ) -- do handshake + if not socket then + return nil, nil, "ssl handshake failed"; + end + else + -- We're not automatically doing SSL, so we're not secure (yet) + ssl = false + handler.starttls = function( self, now ) + if not now then + --out_put "server.lua: we need to do tls, but delaying until later" + needtls = true + return + end + --out_put( "server.lua: attempting to start tls on " .. tostring( socket ) ) + local oldsocket, err = socket + socket, err = ssl_wrap( socket, sslctx ) -- wrap socket + --out_put( "server.lua: sslwrapped socket is " .. tostring( socket ) ) + if err then + out_put( "server.lua: error while starting tls on client: ", tostring(err) ) + return nil, err -- fatal error + end + + socket:settimeout( 0 ) + + -- add the new socket to our system + + send = socket.send + receive = socket.receive + shutdown = id + + _socketlist[ socket ] = handler + _readlistlen = addsocket(_readlist, socket, _readlistlen) + + -- remove traces of the old socket + + _readlistlen = removesocket( _readlist, oldsocket, _readlistlen ) + _sendlistlen = removesocket( _sendlist, oldsocket, _sendlistlen ) + _socketlist[ oldsocket ] = nil + + handler.starttls = nil + needtls = nil + + -- Secure now + ssl = true + + handler.readbuffer = handshake + handler.sendbuffer = handshake + handshake( socket ) -- do handshake + end + handler.readbuffer = _readbuffer + handler.sendbuffer = _sendbuffer + end + else -- normal connection + ssl = false + handler.readbuffer = _readbuffer + handler.sendbuffer = _sendbuffer + end + + send = socket.send + receive = socket.receive + shutdown = ( ssl and id ) or socket.shutdown + + _socketlist[ socket ] = handler + _readlistlen = addsocket(_readlist, socket, _readlistlen) + + return handler, socket +end + +id = function( ) +end + +idfalse = function( ) + return false +end + +addsocket = function( list, socket, len ) + if not list[ socket ] then + len = len + 1 + list[ len ] = socket + list[ socket ] = len + end + return len; +end + +removesocket = function( list, socket, len ) -- this function removes sockets from a list ( copied from copas ) + local pos = list[ socket ] + if pos then + list[ socket ] = nil + local last = list[ len ] + list[ len ] = nil + if last ~= socket then + list[ last ] = pos + list[ pos ] = last + end + return len - 1 + end + return len +end + +closesocket = function( socket ) + _sendlistlen = removesocket( _sendlist, socket, _sendlistlen ) + _readlistlen = removesocket( _readlist, socket, _readlistlen ) + _socketlist[ socket ] = nil + socket:close( ) + --mem_free( ) +end + +----------------------------------// PUBLIC //-- + +addserver = function( addr, port, listeners, pattern, sslctx, startssl ) -- this function provides a way for other scripts to reg a server + local err + --out_put("server.lua: autossl on ", port, " is ", startssl) + if type( listeners ) ~= "table" then + err = "invalid listener table" + end + if not type( port ) == "number" or not ( port >= 0 and port <= 65535 ) then + err = "invalid port" + elseif _server[ port ] then + err = "listeners on port '" .. port .. "' already exist" + elseif sslctx and not luasec then + err = "luasec not found" + end + if err then + out_error( "server.lua, port ", port, ": ", err ) + return nil, err + end + addr = addr or "*" + local server, err = socket_bind( addr, port ) + if err then + out_error( "server.lua, port ", port, ": ", err ) + return nil, err + end + local handler, err = wrapserver( listeners, server, addr, port, pattern, sslctx, _maxclientsperserver, startssl ) -- wrap new server socket + if not handler then + server:close( ) + return nil, err + end + server:settimeout( 0 ) + _readlistlen = addsocket(_readlist, server, _readlistlen) + _server[ port ] = handler + _socketlist[ server ] = handler + out_put( "server.lua: new server listener on '", addr, ":", port, "'" ) + return handler +end + +getserver = function ( port ) + return _server[ port ]; +end + +removeserver = function( port ) + local handler = _server[ port ] + if not handler then + return nil, "no server found on port '" .. tostring( port ) .. "'" + end + handler:close( ) + _server[ port ] = nil + return true +end + +closeall = function( ) + for _, handler in pairs( _socketlist ) do + handler:close( ) + _socketlist[ _ ] = nil + end + _readlistlen = 0 + _sendlistlen = 0 + _timerlistlen = 0 + _server = { } + _readlist = { } + _sendlist = { } + _timerlist = { } + _socketlist = { } + --mem_free( ) +end + +getsettings = function( ) + return _selecttimeout, _sleeptime, _maxsendlen, _maxreadlen, _checkinterval, _sendtimeout, _readtimeout, _cleanqueue, _maxclientsperserver, _maxsslhandshake +end + +changesettings = function( new ) + if type( new ) ~= "table" then + return nil, "invalid settings table" + end + _selecttimeout = tonumber( new.timeout ) or _selecttimeout + _sleeptime = tonumber( new.sleeptime ) or _sleeptime + _maxsendlen = tonumber( new.maxsendlen ) or _maxsendlen + _maxreadlen = tonumber( new.maxreadlen ) or _maxreadlen + _checkinterval = tonumber( new.checkinterval ) or _checkinterval + _sendtimeout = tonumber( new.sendtimeout ) or _sendtimeout + _readtimeout = tonumber( new.readtimeout ) or _readtimeout + _cleanqueue = new.cleanqueue + _maxclientsperserver = new._maxclientsperserver or _maxclientsperserver + _maxsslhandshake = new._maxsslhandshake or _maxsslhandshake + return true +end + +addtimer = function( listener ) + if type( listener ) ~= "function" then + return nil, "invalid listener function" + end + _timerlistlen = _timerlistlen + 1 + _timerlist[ _timerlistlen ] = listener + return true +end + +stats = function( ) + return _readtraffic, _sendtraffic, _readlistlen, _sendlistlen, _timerlistlen +end + +local dontstop = true; -- thinking about tomorrow, ... + +setquitting = function (quit) + dontstop = not quit; + return; +end + +loop = function( ) -- this is the main loop of the program + while dontstop do + local read, write, err = socket_select( _readlist, _sendlist, _selecttimeout ) + for i, socket in ipairs( write ) do -- send data waiting in writequeues + local handler = _socketlist[ socket ] + if handler then + handler.sendbuffer( ) + else + closesocket( socket ) + out_put "server.lua: found no handler and closed socket (writelist)" -- this should not happen + end + end + for i, socket in ipairs( read ) do -- receive data + local handler = _socketlist[ socket ] + if handler then + handler.readbuffer( ) + else + closesocket( socket ) + out_put "server.lua: found no handler and closed socket (readlist)" -- this can happen + end + end + for handler, err in pairs( _closelist ) do + handler.disconnect( )( handler, err ) + handler:close( true ) -- forced disconnect + end + clean( _closelist ) + _currenttime = os_time( ) + if os_difftime( _currenttime - _timer ) >= 1 then + for i = 1, _timerlistlen do + _timerlist[ i ]( _currenttime ) -- fire timers + end + _timer = _currenttime + end + socket_sleep( _sleeptime ) -- wait some time + --collectgarbage( ) + end + return "quitting" +end + +--// EXPERIMENTAL //-- + +local wrapclient = function( socket, ip, serverport, listeners, pattern, sslctx, startssl ) + local handler = wrapconnection( nil, listeners, socket, ip, serverport, "clientport", pattern, sslctx, startssl ) + _socketlist[ socket ] = handler + _sendlistlen = addsocket(_sendlist, socket, _sendlistlen) + return handler, socket +end + +local addclient = function( address, port, listeners, pattern, sslctx, startssl ) + local client, err = luasocket.tcp( ) + if err then + return nil, err + end + client:settimeout( 0 ) + _, err = client:connect( address, port ) + if err then -- try again + local handler = wrapclient( client, address, port, listeners ) + else + wrapconnection( nil, listeners, client, address, port, "clientport", pattern, sslctx, startssl ) + end +end + +--// EXPERIMENTAL //-- + +----------------------------------// BEGIN //-- + +use "setmetatable" ( _socketlist, { __mode = "k" } ) +use "setmetatable" ( _readtimes, { __mode = "k" } ) +use "setmetatable" ( _writetimes, { __mode = "k" } ) + +_timer = os_time( ) +_starttime = os_time( ) + +addtimer( function( ) + local difftime = os_difftime( _currenttime - _starttime ) + if difftime > _checkinterval then + _starttime = _currenttime + for handler, timestamp in pairs( _writetimes ) do + if os_difftime( _currenttime - timestamp ) > _sendtimeout then + --_writetimes[ handler ] = nil + handler.disconnect( )( handler, "send timeout" ) + handler:close( true ) -- forced disconnect + end + end + for handler, timestamp in pairs( _readtimes ) do + if os_difftime( _currenttime - timestamp ) > _readtimeout then + --_readtimes[ handler ] = nil + handler.disconnect( )( handler, "read timeout" ) + handler:close( ) -- forced disconnect? + end + end + end + end +) + +----------------------------------// PUBLIC INTERFACE //-- + +return { + + addclient = addclient, + wrapclient = wrapclient, + + loop = loop, + stats = stats, + closeall = closeall, + addtimer = addtimer, + addserver = addserver, + getserver = getserver, + getsettings = getsettings, + setquitting = setquitting, + removeserver = removeserver, + changesettings = changesettings, +} diff -r 7cb6460b18d8 -r 141896297cea net/xmppclient_listener.lua --- a/net/xmppclient_listener.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/net/xmppclient_listener.lua Wed Nov 25 17:39:23 2009 +0000 @@ -61,7 +61,7 @@ function session.data(conn, data) local ok, err = parser:parse(data); if ok then return; end - log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " ")); + log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); session:close("xml-not-well-formed"); end @@ -100,15 +100,15 @@ end end session.send(""); - session.conn.close(); - xmppclient.disconnect(session.conn, (reason and (reason.text or reason.condition)) or reason or "session closed"); + session.conn:close(); + xmppclient.ondisconnect(session.conn, (reason and (reason.text or reason.condition)) or reason or "session closed"); end end -- End of session methods -- -function xmppclient.listener(conn, data) +function xmppclient.onincoming(conn, data) local session = sessions[conn]; if not session then session = sm_new_session(conn); @@ -117,7 +117,7 @@ session.log("info", "Client connected"); -- Client is using legacy SSL (otherwise mod_tls sets this flag) - if conn.ssl() then + if conn:ssl() then session.secure = true; end @@ -133,14 +133,13 @@ end end -function xmppclient.disconnect(conn, err) +function xmppclient.ondisconnect(conn, err) local session = sessions[conn]; if session then (session.log or log)("info", "Client disconnected: %s", err); sm_destroy_session(session, err); sessions[conn] = nil; session = nil; - collectgarbage("collect"); end end diff -r 7cb6460b18d8 -r 141896297cea net/xmppcomponent_listener.lua --- a/net/xmppcomponent_listener.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/net/xmppcomponent_listener.lua Wed Nov 25 17:39:23 2009 +0000 @@ -118,16 +118,16 @@ end session.send(""); session.conn.close(); - component_listener.disconnect(session.conn, "stream error"); + component_listener.ondisconnect(session.conn, "stream error"); end end --- Component connlistener -function component_listener.listener(conn, data) +function component_listener.onincoming(conn, data) local session = sessions[conn]; if not session then local _send = conn.write; - session = { type = "component", conn = conn, send = function (data) return _send(tostring(data)); end }; + session = { type = "component", conn = conn, send = function (data) return _send(conn, tostring(data)); end }; sessions[conn] = session; -- Logging functions -- @@ -157,7 +157,7 @@ end end -function component_listener.disconnect(conn, err) +function component_listener.ondisconnect(conn, err) local session = sessions[conn]; if session then (session.log or log)("info", "component disconnected: %s (%s)", tostring(session.host), tostring(err)); @@ -169,7 +169,6 @@ sessions[conn] = nil; for k in pairs(session) do session[k] = nil; end session = nil; - collectgarbage("collect"); end end diff -r 7cb6460b18d8 -r 141896297cea net/xmppserver_listener.lua --- a/net/xmppserver_listener.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/net/xmppserver_listener.lua Wed Nov 25 17:39:23 2009 +0000 @@ -104,14 +104,14 @@ session.conn.close(true); -- Force FIXME: timer? end session.conn.close(); - xmppserver.disconnect(session.conn, "stream error"); + xmppserver.ondisconnect(session.conn, "stream error"); end end -- End of session methods -- -function xmppserver.listener(conn, data) +function xmppserver.onincoming(conn, data) local session = sessions[conn]; if not session then session = s2s_new_incoming(conn); @@ -148,7 +148,7 @@ end end -function xmppserver.disconnect(conn, err) +function xmppserver.ondisconnect(conn, err) local session = sessions[conn]; if session then if err and err ~= "closed" and session.srv_hosts then @@ -162,7 +162,6 @@ s2s_destroy_session(session); sessions[conn] = nil; session = nil; - collectgarbage("collect"); end end diff -r 7cb6460b18d8 -r 141896297cea plugins/mod_bosh.lua --- a/plugins/mod_bosh.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/plugins/mod_bosh.lua Wed Nov 25 17:39:23 2009 +0000 @@ -152,7 +152,7 @@ local r, send_buffer = session.requests, session.send_buffer; local response = { headers = default_headers } function session.send(s) - log("debug", "Sending BOSH data: %s", tostring(s)); + --log("debug", "Sending BOSH data: %s", tostring(s)); local oldest_request = r[1]; while oldest_request and oldest_request.destroyed do t_remove(r, 1); @@ -160,7 +160,7 @@ oldest_request = r[1]; end if oldest_request then - log("debug", "We have an open request, so using that to send with"); + log("debug", "We have an open request, so sending on that"); response.body = t_concat{"", tostring(s), "" }; oldest_request:send(response); --log("debug", "Sent"); diff -r 7cb6460b18d8 -r 141896297cea plugins/mod_console.lua --- a/plugins/mod_console.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/plugins/mod_console.lua Wed Nov 25 17:39:23 2009 +0000 @@ -33,7 +33,7 @@ console = {}; function console:new_session(conn) - local w = function(s) conn.write(s:gsub("\n", "\r\n")); end; + local w = function(s) conn:write(s:gsub("\n", "\r\n")); end; local session = { conn = conn; send = function (t) w(tostring(t)); end; print = function (t) w("| "..tostring(t).."\n"); end; @@ -53,7 +53,7 @@ local sessions = {}; -function console_listener.listener(conn, data) +function console_listener.onincoming(conn, data) local session = sessions[conn]; if not session then @@ -126,7 +126,7 @@ session.send(string.char(0)); end -function console_listener.disconnect(conn, err) +function console_listener.ondisconnect(conn, err) local session = sessions[conn]; if session then session.disconnect(); diff -r 7cb6460b18d8 -r 141896297cea plugins/mod_presence.lua --- a/plugins/mod_presence.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/plugins/mod_presence.lua Wed Nov 25 17:39:23 2009 +0000 @@ -233,6 +233,7 @@ -- TODO send last recieved unavailable presence (or we MAY do nothing, which is fine too) end else + core_route_stanza(origin, st.presence({from=to_bare, to=from_bare, type="unavailable"})); -- acknowledging receipt if not rostermanager.is_contact_pending_in(node, host, from_bare) then if rostermanager.set_contact_pending_in(node, host, from_bare) then sessionmanager.send_to_available_resources(node, host, stanza); @@ -241,14 +242,17 @@ end elseif stanza.attr.type == "unsubscribe" then if rostermanager.process_inbound_unsubscribe(node, host, from_bare) then + sessionmanager.send_to_interested_resources(node, host, stanza); rostermanager.roster_push(node, host, from_bare); end elseif stanza.attr.type == "subscribed" then if rostermanager.process_inbound_subscription_approval(node, host, from_bare) then + sessionmanager.send_to_interested_resources(node, host, stanza); rostermanager.roster_push(node, host, from_bare); end elseif stanza.attr.type == "unsubscribed" then if rostermanager.process_inbound_subscription_cancellation(node, host, from_bare) then + sessionmanager.send_to_interested_resources(node, host, stanza); rostermanager.roster_push(node, host, from_bare); end end -- discard any other type diff -r 7cb6460b18d8 -r 141896297cea plugins/mod_proxy65.lua --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/plugins/mod_proxy65.lua Wed Nov 25 17:39:23 2009 +0000 @@ -0,0 +1,268 @@ +-- Copyright (C) 2009 Thilo Cestonaro +-- +-- This project is MIT/X11 licensed. Please see the +-- COPYING file in the source package for more information. +-- +--[[ +* to restart the proxy in the console: e.g. +module:unload("proxy65"); +> server.removeserver(); +module:load("proxy65", ); +]]-- + +if module:get_host_type() ~= "component" then + error("proxy65 should be loaded as a component, please see http://prosody.im/doc/components", 0); +end + +local jid_split = require "util.jid".split; +local st = require "util.stanza"; +local componentmanager = require "core.componentmanager"; +local config_get = require "core.configmanager".get; +local connlisteners = require "net.connlisteners"; +local sha1 = require "util.hashes".sha1; + +local host, name = module:get_host(), "SOCKS5 Bytestreams Service"; +local sessions, transfers, component, replies_cache = {}, {}, nil, {}; + +local proxy_port = config_get(host, "core", "proxy65_port") or 5000; +local proxy_interface = config_get(host, "core", "proxy65_interface") or "*"; +local proxy_address = config_get(host, "core", "proxy65_address") or (proxy_interface ~= "*" and proxy_interface) or host; +local proxy_acl = config_get(host, "core", "proxy65_acl"); + +local connlistener = { default_port = proxy_port, default_interface = proxy_interface, default_mode = "*a" }; + +function connlistener.listener(conn, data) + local session = sessions[conn] or {}; + + if session.setup == nil and data ~= nil and data:sub(1):byte() == 0x05 and data:len() > 2 then + local nmethods = data:sub(2):byte(); + local methods = data:sub(3); + local supported = false; + for i=1, nmethods, 1 do + if(methods:sub(i):byte() == 0x00) then -- 0x00 == method: NO AUTH + supported = true; + break; + end + end + if(supported) then + module:log("debug", "new session found ... ") + session.setup = true; + sessions[conn] = session; + conn:write(string.char(5, 0)); + end + return; + end + if session.setup then + if session.sha ~= nil and transfers[session.sha] ~= nil then + local sha = session.sha; + if transfers[sha].activated == true and transfers[sha].initiator == conn and transfers[sha].target ~= nil then + transfers[sha].target:write(data); + return; + end + end + if data ~= nil and data:len() == 0x2F and -- 40 == length of SHA1 HASH, and 7 other bytes => 47 => 0x2F + data:sub(1):byte() == 0x05 and -- SOCKS5 has 5 in first byte + data:sub(2):byte() == 0x01 and -- CMD must be 1 + data:sub(3):byte() == 0x00 and -- RSV must be 0 + data:sub(4):byte() == 0x03 and -- ATYP must be 3 + data:sub(5):byte() == 40 and -- SHA1 HASH length must be 40 (0x28) + data:sub(-2):byte() == 0x00 and -- PORT must be 0, size 2 byte + data:sub(-1):byte() == 0x00 + then + local sha = data:sub(6, 45); -- second param is not count! it's the ending index (included!) + if transfers[sha] == nil then + transfers[sha] = {}; + transfers[sha].activated = false; + transfers[sha].target = conn; + session.sha = sha; + module:log("debug", "target connected ... "); + elseif transfers[sha].target ~= nil then + transfers[sha].initiator = conn; + session.sha = sha; + module:log("debug", "initiator connected ... "); + end + conn:write(string.char(5, 0, 0, 3, sha:len()) .. sha .. string.char(0, 0)); -- VER, REP, RSV, ATYP, BND.ADDR (sha), BND.PORT (2 Byte) + else + log:module("warn", "Neither data transfer nor initial connect of a participator of a transfer.") + conn.close(); + end + else + if data ~= nil then + module:log("warn", "unknown connection with no authentication data -> closing it"); + conn.close(); + end + end +end + +function connlistener.disconnect(conn, err) + local session = sessions[conn]; + if session then + if session.sha and transfers[session.sha] then + local initiator, target = transfers[session.sha].initiator, transfers[session.sha].target; + if initiator == conn and target ~= nil then + target.close(); + elseif target == conn and initiator ~= nil then + initiator.close(); + end + transfers[session.sha] = nil; + end + -- Clean up any session-related stuff here + sessions[conn] = nil; + end +end + +local function get_disco_info(stanza) + local reply = replies_cache.disco_info; + if reply == nil then + reply = st.iq({type='result', from=host}):query("http://jabber.org/protocol/disco#info") + :tag("identity", {category='proxy', type='bytestreams', name=name}):up() + :tag("feature", {var="http://jabber.org/protocol/bytestreams"}); + replies_cache.disco_info = reply; + end + + reply.attr.id = stanza.attr.id; + reply.attr.to = stanza.attr.from; + return reply; +end + +local function get_disco_items(stanza) + local reply = replies_cache.disco_items; + if reply == nil then + reply = st.iq({type='result', from=host}):query("http://jabber.org/protocol/disco#items"); + replies_cache.disco_items = reply; + end + + reply.attr.id = stanza.attr.id; + reply.attr.to = stanza.attr.from; + return reply; +end + +local function _jid_join(node, host, resource) + local ret = host; + if ret then + if node then + ret = node .. "@" .. ret; + end + if resource then + ret = ret .. "/" .. resource; + end + end + return ret; +end + +local function get_stream_host(origin, stanza) + local reply = replies_cache.stream_host; + local err_reply = replies_cache.stream_host_err; + local sid = stanza.tags[1].attr.sid; + local allow = false; + local jid_node, jid_host, jid_resource = jid_split(stanza.attr.from); + + if stanza.attr.from == nil then + jid_node = origin.username; + jid_host = origin.host; + jid_resource = origin.resource; + end + + if proxy_acl and #proxy_acl > 0 then + if host ~= nil then -- at least a domain is needed. + for _, acl in ipairs(proxy_acl) do + local acl_node, acl_host, acl_resource = jid_split(acl); + if ((acl_node ~= nil and acl_node == jid_node) or acl_node == nil) and + ((acl_host ~= nil and acl_host == jid_host) or acl_host == nil) and + ((acl_resource ~= nil and acl_resource == jid_resource) or acl_resource == nil) then + allow = true; + end + end + end + else + allow = true; + end + if allow == true then + if reply == nil then + reply = st.iq({type="result", from=host}) + :query("http://jabber.org/protocol/bytestreams") + :tag("streamhost", {jid=host, host=proxy_address, port=proxy_port}); + replies_cache.stream_host = reply; + end + else + module:log("warn", "Denying use of proxy for %s", tostring(_jid_join(jid_node, jid_host, jid_resource))); + if err_reply == nil then + err_reply = st.iq({type="error", from=host}) + :query("http://jabber.org/protocol/bytestreams") + :tag("error", {code='403', type='auth'}) + :tag("forbidden", {xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'}); + replies_cache.stream_host_err = err_reply; + end + reply = err_reply; + end + reply.attr.id = stanza.attr.id; + reply.attr.to = stanza.attr.from; + reply.tags[1].attr.sid = sid; + return reply; +end + +module.unload = function() + componentmanager.deregister_component(host); + connlisteners.deregister(module.host .. ':proxy65'); +end + +local function set_activation(stanza) + local from, to, sid, reply = nil; + from = stanza.attr.from; + if stanza.tags[1] ~= nil and tostring(stanza.tags[1].name) == "query" then + if stanza.tags[1].attr ~= nil then + sid = stanza.tags[1].attr.sid; + end + if stanza.tags[1].tags[1] ~= nil and tostring(stanza.tags[1].tags[1].name) == "activate" then + to = stanza.tags[1].tags[1][1]; + end + end + if from ~= nil and to ~= nil and sid ~= nil then + reply = st.iq({type="result", from=host, to=from}); + reply.attr.id = stanza.attr.id; + end + return reply, from, to, sid; +end + +function handle_to_domain(origin, stanza) + local to_node, to_host, to_resource = jid_split(stanza.attr.to); + if to_node == nil then + local type = stanza.attr.type; + if type == "error" or type == "result" then return; end + if stanza.name == "iq" and type == "get" then + local xmlns = stanza.tags[1].attr.xmlns + if xmlns == "http://jabber.org/protocol/disco#info" then + origin.send(get_disco_info(stanza)); + return true; + elseif xmlns == "http://jabber.org/protocol/disco#items" then + origin.send(get_disco_items(stanza)); + return true; + elseif xmlns == "http://jabber.org/protocol/bytestreams" then + origin.send(get_stream_host(origin, stanza)); + return true; + end + elseif stanza.name == "iq" and type == "set" then + local reply, from, to, sid = set_activation(stanza); + if reply ~= nil and from ~= nil and to ~= nil and sid ~= nil then + local sha = sha1(sid .. from .. to, true); + if transfers[sha] == nil then + module:log("error", "transfers[sha]: nil"); + elseif(transfers[sha] ~= nil and transfers[sha].initiator ~= nil and transfers[sha].target ~= nil) then + origin.send(reply); + transfers[sha].activated = true; + end + else + module:log("error", "activation failed: sid: %s, initiator: %s, target: %s", tostring(sid), tostring(from), tostring(to)); + end + end + end + return; +end + +if not connlisteners.register(module.host .. ':proxy65', connlistener) then + error("mod_proxy65: Could not establish a connection listener. Check your configuration please."); + error(" one possible cause for this would be that two proxy65 components share the same port."); +end + +connlisteners.start(module.host .. ':proxy65'); +component = componentmanager.register_component(host, handle_to_domain); diff -r 7cb6460b18d8 -r 141896297cea plugins/mod_tls.lua --- a/plugins/mod_tls.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/plugins/mod_tls.lua Wed Nov 25 17:39:23 2009 +0000 @@ -20,9 +20,9 @@ session.send(st.stanza("proceed", { xmlns = xmlns_starttls })); session:reset_stream(); if session.host and hosts[session.host].ssl_ctx_in then - session.conn.set_sslctx(hosts[session.host].ssl_ctx_in); + session.conn:set_sslctx(hosts[session.host].ssl_ctx_in); end - session.conn.starttls(); + session.conn:starttls(); session.log("info", "TLS negotiation started..."); session.secure = false; else @@ -37,9 +37,9 @@ session.sends2s(st.stanza("proceed", { xmlns = xmlns_starttls })); session:reset_stream(); if session.to_host and hosts[session.to_host].ssl_ctx_in then - session.conn.set_sslctx(hosts[session.to_host].ssl_ctx_in); + session.conn:set_sslctx(hosts[session.to_host].ssl_ctx_in); end - session.conn.starttls(); + session.conn:starttls(); session.log("info", "TLS negotiation started for incoming s2s..."); session.secure = false; else @@ -91,7 +91,7 @@ module:log("debug", "Proceeding with TLS on s2sout..."); local format, to_host, from_host = string.format, session.to_host, session.from_host; session:reset_stream(); - session.conn.starttls(true); + session.conn:starttls(true); session.secure = false; return true; end); diff -r 7cb6460b18d8 -r 141896297cea prosody --- a/prosody Thu Nov 19 17:53:52 2009 +0100 +++ b/prosody Wed Nov 25 17:39:23 2009 +0000 @@ -14,7 +14,7 @@ CFG_PLUGINDIR=os.getenv("PROSODY_PLUGINDIR"); CFG_DATADIR=os.getenv("PROSODY_DATADIR"); --- -- -- -- -- -- -- ---- -- -- -- -- -- -- -- -- +-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- if CFG_SOURCEDIR then package.path = CFG_SOURCEDIR.."/?.lua;"..package.path; @@ -58,7 +58,27 @@ function read_config() -- TODO: Check for other formats when we add support for them -- Use lfs? Make a new conf/ dir? - local ok, level, err = config.load((CFG_CONFIGDIR or ".").."/prosody.cfg.lua"); + local filenames = {}; + + local filename; + if arg[1] == "--config" and arg[2] then + table.insert(filenames, arg[2]); + if CFG_CONFIGDIR then + table.insert(filenames, CFG_CONFIGDIR.."/"..arg[2]); + end + else + table.insert(filenames, (CFG_CONFIGDIR or ".").."/prosody.cfg.lua"); + end + for _,_filename in ipairs(filenames) do + filename = _filename; + local file = io.open(filename); + if file then + file:close(); + CFG_CONFIGDIR = filename:match("^(.*)[\\/][^\\/]*$"); + break; + end + end + local ok, level, err = config.load(filename); if not ok then print("\n"); print("**************************"); @@ -82,13 +102,13 @@ end function load_libraries() - --- Initialize logging + -- Initialize logging require "core.loggingmanager" - --- Check runtime dependencies + -- Check runtime dependencies require "util.dependencies" - --- Load socket framework + -- Load socket framework server = require "net.server" end diff -r 7cb6460b18d8 -r 141896297cea tests/modulemanager_option_conversion.lua --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/modulemanager_option_conversion.lua Wed Nov 25 17:39:23 2009 +0000 @@ -0,0 +1,55 @@ +package.path = "../?.lua;"..package.path; + +local api = require "core.modulemanager".api; + +local module = setmetatable({}, {__index = api}); +local opt = nil; +function module:log() end +function module:get_option(name) + if name == "opt" then + return opt; + else + return nil; + end +end + +function test_value(value, returns) + opt = value; + assert(module:get_option_number("opt") == returns.number, "number doesn't match"); + assert(module:get_option_string("opt") == returns.string, "string doesn't match"); + assert(module:get_option_boolean("opt") == returns.boolean, "boolean doesn't match"); + + if type(returns.array) == "table" then + local target_array, returned_array = returns.array, module:get_option_array("opt"); + assert(#target_array == #returned_array, "array length doesn't match"); + for i=1,#target_array do + assert(target_array[i] == returned_array[i], "array item doesn't match"); + end + else + assert(module:get_option_array("opt") == returns.array, "array is returned (not nil)"); + end + + if type(returns.set) == "table" then + local target_items, returned_items = set.new(returns.set), module:get_option_set("opt"); + assert(target_items == returned_items, "set doesn't match"); + else + assert(module:get_option_set("opt") == returns.set, "set is returned (not nil)"); + end +end + +test_value(nil, {}); + +test_value(true, { boolean = true, string = "true", array = {true}, set = {true} }); +test_value(false, { boolean = false, string = "false", array = {false}, set = {false} }); +test_value("true", { boolean = true, string = "true", array = {"true"}, set = {"true"} }); +test_value("false", { boolean = false, string = "false", array = {"false"}, set = {"false"} }); +test_value(1, { boolean = true, string = "1", array = {1}, set = {1}, number = 1 }); +test_value(0, { boolean = false, string = "0", array = {0}, set = {0}, number = 0 }); + +test_value("hello world", { string = "hello world", array = {"hello world"}, set = {"hello world"} }); +test_value(1234, { string = "1234", number = 1234, array = {1234}, set = {1234} }); + +test_value({1, 2, 3}, { boolean = true, string = "1", number = 1, array = {1, 2, 3}, set = {1, 2, 3} }); +test_value({1, 2, 3, 3, 4}, {boolean = true, string = "1", number = 1, array = {1, 2, 3, 3, 4}, set = {1, 2, 3, 4} }); +test_value({0, 1, 2, 3}, { boolean = false, string = "0", number = 0, array = {0, 1, 2, 3}, set = {0, 1, 2, 3} }); + diff -r 7cb6460b18d8 -r 141896297cea util/datamanager.lua --- a/util/datamanager.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/util/datamanager.lua Wed Nov 25 17:39:23 2009 +0000 @@ -15,13 +15,13 @@ local log = require "util.logger".init("datamanager"); local io_open = io.open; local os_remove = os.remove; -local io_popen = io.popen; local tostring, tonumber = tostring, tonumber; local error = error; local next = next; local t_insert = table.insert; local append = require "util.serialization".append; local path_separator = "/"; if os.getenv("WINDIR") then path_separator = "\\" end +local lfs_mkdir = require "lfs".mkdir; module "datamanager" @@ -43,7 +43,7 @@ local function mkdir(path) path = path:gsub("/", path_separator); -- TODO as an optimization, do this during path creation rather than here if not _mkdir[path] then - local x = io_popen("mkdir \""..path.."\" 2>&1"):read("*a"); + lfs_mkdir(path); _mkdir[path] = true; end return path; diff -r 7cb6460b18d8 -r 141896297cea util/dependencies.lua --- a/util/dependencies.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/util/dependencies.lua Wed Nov 25 17:39:23 2009 +0000 @@ -17,8 +17,12 @@ print("Prosody was unable to find "..tostring(name)); print("This package can be obtained in the following ways:"); print(""); - for k,v in pairs(sources) do - print("", k, v); + local longest_platform = 0; + for platform in pairs(sources) do + longest_platform = math.max(longest_platform, #platform); + end + for platform, source in pairs(sources) do + print("", platform..":"..(" "):rep(4+longest_platform-#platform)..source); end print(""); print(msg or (name.." is required for Prosody to run, so we will now exit.")); @@ -30,24 +34,51 @@ local lxp = softreq "lxp" if not lxp then - missingdep("luaexpat", { ["Ubuntu 8.04 (Hardy)"] = "sudo apt-get install liblua5.1-expat0"; ["luarocks"] = "luarocks install luaexpat"; }); + missingdep("luaexpat", { + ["Debian/Ubuntu"] = "sudo apt-get install liblua5.1-expat0"; + ["luarocks"] = "luarocks install luaexpat"; + ["Source"] = "http://www.keplerproject.org/luaexpat/"; + }); fatal = true; end local socket = softreq "socket" if not socket then - missingdep("luasocket", { ["Ubuntu 8.04 (Hardy)"] = "sudo apt-get install liblua5.1-socket2"; ["luarocks"] = "luarocks install luasocket"; }); + missingdep("luasocket", { + ["Debian/Ubuntu"] = "sudo apt-get install liblua5.1-socket2"; + ["luarocks"] = "luarocks install luasocket"; + ["Source"] = "http://www.tecgraf.puc-rio.br/~diego/professional/luasocket/"; + }); fatal = true; end +local lfs, err = softreq "lfs" +if not lfs then + missingdep("luafilesystem", { + ["luarocks"] = "luarocks install luafilesystem"; + ["Debian/Ubuntu"] = "sudo apt-get install liblua5.1-luafilesystem0"; + ["Source"] = "http://www.keplerproject.org/luafilesystem/"; + }); + fatal = true; +end + local ssl = softreq "ssl" if not ssl then if config.get("*", "core", "run_without_ssl") then log("warn", "Running without SSL support because run_without_ssl is defined in the config"); else - missingdep("LuaSec", { ["Source"] = "http://www.inf.puc-rio.br/~brunoos/luasec/" }, "SSL/TLS support will not be available"); + missingdep("LuaSec", { + ["Debian/Ubuntu"] = "http://prosody.im/download/start#debian_and_ubuntu"; + ["luarocks"] = "luarocks install luasec"; + ["Source"] = "http://www.inf.puc-rio.br/~brunoos/luasec/"; + }, "SSL/TLS support will not be available"); + end +else + local major, minor, veryminor, patched = ssl._VERSION:match("(%d+)%.(%d+)%.?(%d*)(M?)"); + if not major or ((tonumber(major) == 0 and (tonumber(minor) or 0) <= 3 and (tonumber(veryminor) or 0) <= 2) and patched ~= "M") then + log("error", "This version of LuaSec contains a known bug that causes disconnects, see http://prosody.im/doc/depends"); end end diff -r 7cb6460b18d8 -r 141896297cea util/serialization.lua --- a/util/serialization.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/util/serialization.lua Wed Nov 25 17:39:23 2009 +0000 @@ -13,6 +13,7 @@ local t_concat = table.concat; local error = error; local pairs = pairs; +local next = next; local debug_traceback = debug.traceback; local log = require "util.logger".init("serialization"); @@ -34,21 +35,25 @@ elseif type(o) == "string" then func(t, (("%q"):format(o):gsub("\\\n", "\\n"))); elseif type(o) == "table" then - func(t, "{\n"); - for k,v in pairs(o) do - func(t, indent(ind)); - func(t, "["); - func(t, basicSerialize(k)); - func(t, "] = "); - if ind == 0 then - _simplesave(v, 0, t, func); - else - _simplesave(v, ind+1, t, func); + if next(o) then + func(t, "{\n"); + for k,v in pairs(o) do + func(t, indent(ind)); + func(t, "["); + func(t, basicSerialize(k)); + func(t, "] = "); + if ind == 0 then + _simplesave(v, 0, t, func); + else + _simplesave(v, ind+1, t, func); + end + func(t, ";\n"); end - func(t, ",\n"); + func(t, indent(ind-1)); + func(t, "}"); + else + func(t, "{}"); end - func(t, indent(ind-1)); - func(t, "}"); elseif type(o) == "boolean" then func(t, (o and "true" or "false")); else diff -r 7cb6460b18d8 -r 141896297cea util/timer.lua --- a/util/timer.lua Thu Nov 19 17:53:52 2009 +0100 +++ b/util/timer.lua Wed Nov 25 17:39:23 2009 +0000 @@ -8,6 +8,9 @@ local ns_addtimer = require "net.server".addtimer; +local event = require "net.server".event; +local event_base = require "net.server".event_base; + local get_time = os.time; local t_insert = table.insert; local t_remove = table.remove; @@ -19,33 +22,51 @@ module "timer" -local function _add_task(delay, func) - local current_time = get_time(); - delay = delay + current_time; - if delay >= current_time then - t_insert(new_data, {delay, func}); - else func(); end +local _add_task; +if not event then + function _add_task(delay, func) + local current_time = get_time(); + delay = delay + current_time; + if delay >= current_time then + t_insert(new_data, {delay, func}); + else + func(); + end + end + + ns_addtimer(function() + local current_time = get_time(); + if #new_data > 0 then + for _, d in pairs(new_data) do + t_insert(data, d); + end + new_data = {}; + end + + for i, d in pairs(data) do + local t, func = d[1], d[2]; + if t <= current_time then + data[i] = nil; + local r = func(current_time); + if type(r) == "number" then _add_task(r, func); end + end + end + end); +else + local EVENT_LEAVE = (event.core and event.core.LEAVE) or -1; + function _add_task(delay, func) + event_base:addevent(nil, event.EV_TIMEOUT, function () + local ret = func(); + if ret then + _add_task(ret, func); + else + return EVENT_LEAVE; + end + end + , delay); + end end add_task = _add_task; -ns_addtimer(function() - local current_time = get_time(); - if #new_data > 0 then - for _, d in pairs(new_data) do - t_insert(data, d); - end - new_data = {}; - end - - for i, d in pairs(data) do - local t, func = d[1], d[2]; - if t <= current_time then - data[i] = nil; - local r = func(current_time); - if type(r) == "number" then _add_task(r, func); end - end - end -end); - return _M;