Comparison

net/server_epoll.lua @ 10059:c8c3f2eba898 0.11

net.server_epoll: Backport timer optimization 6c2370f17027 from trunk (see #1388) The previous timer handling did not scale well and led to high CPU usage with many connections (each with at least an read timeout).
author Kim Alvefur <zash@zash.se>
date Mon, 08 Jul 2019 01:17:34 +0200
parent 9994:524b8cd76780
child 10061:5c71693c8345
child 10681:e531273341d3
comparison
equal deleted inserted replaced
10057:5c9341a1b36f 10059:c8c3f2eba898
4 -- This project is MIT/X11 licensed. Please see the 4 -- This project is MIT/X11 licensed. Please see the
5 -- COPYING file in the source package for more information. 5 -- COPYING file in the source package for more information.
6 -- 6 --
7 7
8 8
9 local t_sort = table.sort;
10 local t_insert = table.insert; 9 local t_insert = table.insert;
11 local t_remove = table.remove;
12 local t_concat = table.concat; 10 local t_concat = table.concat;
13 local setmetatable = setmetatable; 11 local setmetatable = setmetatable;
14 local tostring = tostring; 12 local tostring = tostring;
15 local pcall = pcall; 13 local pcall = pcall;
16 local type = type; 14 local type = type;
18 local pairs = pairs; 16 local pairs = pairs;
19 local log = require "util.logger".init("server_epoll"); 17 local log = require "util.logger".init("server_epoll");
20 local socket = require "socket"; 18 local socket = require "socket";
21 local luasec = require "ssl"; 19 local luasec = require "ssl";
22 local gettime = require "util.time".now; 20 local gettime = require "util.time".now;
21 local indexedbheap = require "util.indexedbheap";
23 local createtable = require "util.table".create; 22 local createtable = require "util.table".create;
24 local inet = require "util.net"; 23 local inet = require "util.net";
25 local inet_pton = inet.pton; 24 local inet_pton = inet.pton;
26 local _SOCKETINVALID = socket._SOCKETINVALID or -1; 25 local _SOCKETINVALID = socket._SOCKETINVALID or -1;
27 26
64 63
65 local fds = createtable(10, 0); -- FD -> conn 64 local fds = createtable(10, 0); -- FD -> conn
66 65
67 -- Timer and scheduling -- 66 -- Timer and scheduling --
68 67
69 local timers = {}; 68 local timers = indexedbheap.create();
70 69
71 local function noop() end 70 local function noop() end
72 local function closetimer(t) 71 local function closetimer(t)
73 t[1] = 0; 72 t[1] = 0;
74 t[2] = noop; 73 t[2] = noop;
75 end 74 timers:remove(t.id);
76 75 end
77 -- Set to true when timers have changed 76
78 local resort_timers = false; 77 local function reschedule(t, time)
78 t[1] = time;
79 timers:reprioritize(t.id, time);
80 end
79 81
80 -- Add absolute timer 82 -- Add absolute timer
81 local function at(time, f) 83 local function at(time, f)
82 local timer = { time, f, close = closetimer }; 84 local timer = { time, f, close = closetimer, reschedule = reschedule, id = nil };
83 t_insert(timers, timer); 85 timer.id = timers:insert(timer, time);
84 resort_timers = true;
85 return timer; 86 return timer;
86 end 87 end
87 88
88 -- Add relative timer 89 -- Add relative timer
89 local function addtimer(timeout, f) 90 local function addtimer(timeout, f)
92 93
93 -- Run callbacks of expired timers 94 -- Run callbacks of expired timers
94 -- Return time until next timeout 95 -- Return time until next timeout
95 local function runtimers(next_delay, min_wait) 96 local function runtimers(next_delay, min_wait)
96 -- Any timers at all? 97 -- Any timers at all?
97 if not timers[1] then 98 local now = gettime();
99 local peek = timers:peek();
100 while peek do
101
102 if peek > now then
103 next_delay = peek - now;
104 break;
105 end
106
107 local _, timer, id = timers:pop();
108 local ok, ret = pcall(timer[2], now);
109 if ok and type(ret) == "number" then
110 local next_time = now+ret;
111 timer[1] = next_time;
112 timers:insert(timer, next_time);
113 end
114
115 peek = timers:peek();
116 end
117 if peek == nil then
98 return next_delay; 118 return next_delay;
99 end 119 end
100 120
101 if resort_timers then 121 if next_delay < min_wait then
102 -- Sort earliest timers to the end 122 return min_wait;
103 t_sort(timers, function (a, b) return a[1] > b[1]; end); 123 end
104 resort_timers = false;
105 end
106
107 -- Iterate from the end and remove completed timers
108 for i = #timers, 1, -1 do
109 local timer = timers[i];
110 local t, f = timer[1], timer[2];
111 -- Get time for every iteration to increase accuracy
112 local now = gettime();
113 if t > now then
114 -- This timer should not fire yet
115 local diff = t - now;
116 if diff < next_delay then
117 next_delay = diff;
118 end
119 break;
120 end
121 local new_timeout = f(now);
122 if new_timeout then
123 -- Schedule for 'delay' from the time actually scheduled,
124 -- not from now, in order to prevent timer drift.
125 timer[1] = t + new_timeout;
126 resort_timers = true;
127 else
128 t_remove(timers, i);
129 end
130 end
131
132 if resort_timers or next_delay < min_wait then
133 -- Timers may be added from within a timer callback.
134 -- Those would not be considered for next_delay,
135 -- and we might sleep for too long, so instead
136 -- we return a shorter timeout so we can
137 -- properly sort all new timers.
138 next_delay = min_wait;
139 end
140
141 return next_delay; 124 return next_delay;
142 end 125 end
143 126
144 -- Socket handler interface 127 -- Socket handler interface
145 128
241 end 224 end
242 return 225 return
243 end 226 end
244 t = t or cfg.read_timeout; 227 t = t or cfg.read_timeout;
245 if self._readtimeout then 228 if self._readtimeout then
246 self._readtimeout[1] = gettime() + t; 229 self._readtimeout:reschedule(gettime() + t);
247 resort_timers = true;
248 else 230 else
249 self._readtimeout = addtimer(t, function () 231 self._readtimeout = addtimer(t, function ()
250 if self:on("readtimeout") then 232 if self:on("readtimeout") then
251 return cfg.read_timeout; 233 return cfg.read_timeout;
252 else 234 else
266 end 248 end
267 return 249 return
268 end 250 end
269 t = t or cfg.send_timeout; 251 t = t or cfg.send_timeout;
270 if self._writetimeout then 252 if self._writetimeout then
271 self._writetimeout[1] = gettime() + t; 253 self._writetimeout:reschedule(gettime() + t);
272 resort_timers = true;
273 else 254 else
274 self._writetimeout = addtimer(t, function () 255 self._writetimeout = addtimer(t, function ()
275 self:on("disconnect", "write timeout"); 256 self:on("disconnect", "write timeout");
276 self:destroy(); 257 self:destroy();
277 end); 258 end);