Diff

net/server_epoll.lua @ 10059:c8c3f2eba898 0.11

net.server_epoll: Backport timer optimization 6c2370f17027 from trunk (see #1388) The previous timer handling did not scale well and led to high CPU usage with many connections (each with at least an read timeout).
author Kim Alvefur <zash@zash.se>
date Mon, 08 Jul 2019 01:17:34 +0200
parent 9994:524b8cd76780
child 10061:5c71693c8345
child 10681:e531273341d3
line wrap: on
line diff
--- a/net/server_epoll.lua	Thu Jun 20 22:25:46 2019 +0200
+++ b/net/server_epoll.lua	Mon Jul 08 01:17:34 2019 +0200
@@ -6,9 +6,7 @@
 --
 
 
-local t_sort = table.sort;
 local t_insert = table.insert;
-local t_remove = table.remove;
 local t_concat = table.concat;
 local setmetatable = setmetatable;
 local tostring = tostring;
@@ -20,6 +18,7 @@
 local socket = require "socket";
 local luasec = require "ssl";
 local gettime = require "util.time".now;
+local indexedbheap = require "util.indexedbheap";
 local createtable = require "util.table".create;
 local inet = require "util.net";
 local inet_pton = inet.pton;
@@ -66,22 +65,24 @@
 
 -- Timer and scheduling --
 
-local timers = {};
+local timers = indexedbheap.create();
 
 local function noop() end
 local function closetimer(t)
 	t[1] = 0;
 	t[2] = noop;
+	timers:remove(t.id);
 end
 
--- Set to true when timers have changed
-local resort_timers = false;
+local function reschedule(t, time)
+	t[1] = time;
+	timers:reprioritize(t.id, time);
+end
 
 -- Add absolute timer
 local function at(time, f)
-	local timer = { time, f, close = closetimer };
-	t_insert(timers, timer);
-	resort_timers = true;
+	local timer = { time, f, close = closetimer, reschedule = reschedule, id = nil };
+	timer.id = timers:insert(timer, time);
 	return timer;
 end
 
@@ -94,50 +95,32 @@
 -- Return time until next timeout
 local function runtimers(next_delay, min_wait)
 	-- Any timers at all?
-	if not timers[1] then
+	local now = gettime();
+	local peek = timers:peek();
+	while peek do
+
+		if peek > now then
+			next_delay = peek - now;
+			break;
+	end
+
+		local _, timer, id = timers:pop();
+		local ok, ret = pcall(timer[2], now);
+		if ok and type(ret) == "number"  then
+			local next_time = now+ret;
+			timer[1] = next_time;
+			timers:insert(timer, next_time);
+	end
+
+		peek = timers:peek();
+			end
+	if peek == nil then
 		return next_delay;
 	end
 
-	if resort_timers then
-		-- Sort earliest timers to the end
-		t_sort(timers, function (a, b) return a[1] > b[1]; end);
-		resort_timers = false;
+	if next_delay < min_wait then
+		return min_wait;
 	end
-
-	-- Iterate from the end and remove completed timers
-	for i = #timers, 1, -1 do
-		local timer = timers[i];
-		local t, f = timer[1], timer[2];
-		-- Get time for every iteration to increase accuracy
-		local now = gettime();
-		if t > now then
-			-- This timer should not fire yet
-			local diff = t - now;
-			if diff < next_delay then
-				next_delay = diff;
-			end
-			break;
-		end
-		local new_timeout = f(now);
-		if new_timeout then
-			-- Schedule for 'delay' from the time actually scheduled,
-			-- not from now, in order to prevent timer drift.
-			timer[1] = t + new_timeout;
-			resort_timers = true;
-		else
-			t_remove(timers, i);
-		end
-	end
-
-	if resort_timers or next_delay < min_wait then
-		-- Timers may be added from within a timer callback.
-		-- Those would not be considered for next_delay,
-		-- and we might sleep for too long, so instead
-		-- we return a shorter timeout so we can
-		-- properly sort all new timers.
-		next_delay = min_wait;
-	end
-
 	return next_delay;
 end
 
@@ -243,8 +226,7 @@
 	end
 	t = t or cfg.read_timeout;
 	if self._readtimeout then
-		self._readtimeout[1] = gettime() + t;
-		resort_timers = true;
+		self._readtimeout:reschedule(gettime() + t);
 	else
 		self._readtimeout = addtimer(t, function ()
 			if self:on("readtimeout") then
@@ -268,8 +250,7 @@
 	end
 	t = t or cfg.send_timeout;
 	if self._writetimeout then
-		self._writetimeout[1] = gettime() + t;
-		resort_timers = true;
+		self._writetimeout:reschedule(gettime() + t);
 	else
 		self._writetimeout = addtimer(t, function ()
 			self:on("disconnect", "write timeout");