Software /
code /
prosody
Comparison
net/server_epoll.lua @ 10059:c8c3f2eba898 0.11
net.server_epoll: Backport timer optimization 6c2370f17027 from trunk (see #1388)
The previous timer handling did not scale well and led to high CPU usage
with many connections (each with at least an read timeout).
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Mon, 08 Jul 2019 01:17:34 +0200 |
parent | 9994:524b8cd76780 |
child | 10061:5c71693c8345 |
child | 10681:e531273341d3 |
comparison
equal
deleted
inserted
replaced
10057:5c9341a1b36f | 10059:c8c3f2eba898 |
---|---|
4 -- This project is MIT/X11 licensed. Please see the | 4 -- This project is MIT/X11 licensed. Please see the |
5 -- COPYING file in the source package for more information. | 5 -- COPYING file in the source package for more information. |
6 -- | 6 -- |
7 | 7 |
8 | 8 |
9 local t_sort = table.sort; | |
10 local t_insert = table.insert; | 9 local t_insert = table.insert; |
11 local t_remove = table.remove; | |
12 local t_concat = table.concat; | 10 local t_concat = table.concat; |
13 local setmetatable = setmetatable; | 11 local setmetatable = setmetatable; |
14 local tostring = tostring; | 12 local tostring = tostring; |
15 local pcall = pcall; | 13 local pcall = pcall; |
16 local type = type; | 14 local type = type; |
18 local pairs = pairs; | 16 local pairs = pairs; |
19 local log = require "util.logger".init("server_epoll"); | 17 local log = require "util.logger".init("server_epoll"); |
20 local socket = require "socket"; | 18 local socket = require "socket"; |
21 local luasec = require "ssl"; | 19 local luasec = require "ssl"; |
22 local gettime = require "util.time".now; | 20 local gettime = require "util.time".now; |
21 local indexedbheap = require "util.indexedbheap"; | |
23 local createtable = require "util.table".create; | 22 local createtable = require "util.table".create; |
24 local inet = require "util.net"; | 23 local inet = require "util.net"; |
25 local inet_pton = inet.pton; | 24 local inet_pton = inet.pton; |
26 local _SOCKETINVALID = socket._SOCKETINVALID or -1; | 25 local _SOCKETINVALID = socket._SOCKETINVALID or -1; |
27 | 26 |
64 | 63 |
65 local fds = createtable(10, 0); -- FD -> conn | 64 local fds = createtable(10, 0); -- FD -> conn |
66 | 65 |
67 -- Timer and scheduling -- | 66 -- Timer and scheduling -- |
68 | 67 |
69 local timers = {}; | 68 local timers = indexedbheap.create(); |
70 | 69 |
71 local function noop() end | 70 local function noop() end |
72 local function closetimer(t) | 71 local function closetimer(t) |
73 t[1] = 0; | 72 t[1] = 0; |
74 t[2] = noop; | 73 t[2] = noop; |
75 end | 74 timers:remove(t.id); |
76 | 75 end |
77 -- Set to true when timers have changed | 76 |
78 local resort_timers = false; | 77 local function reschedule(t, time) |
78 t[1] = time; | |
79 timers:reprioritize(t.id, time); | |
80 end | |
79 | 81 |
80 -- Add absolute timer | 82 -- Add absolute timer |
81 local function at(time, f) | 83 local function at(time, f) |
82 local timer = { time, f, close = closetimer }; | 84 local timer = { time, f, close = closetimer, reschedule = reschedule, id = nil }; |
83 t_insert(timers, timer); | 85 timer.id = timers:insert(timer, time); |
84 resort_timers = true; | |
85 return timer; | 86 return timer; |
86 end | 87 end |
87 | 88 |
88 -- Add relative timer | 89 -- Add relative timer |
89 local function addtimer(timeout, f) | 90 local function addtimer(timeout, f) |
92 | 93 |
93 -- Run callbacks of expired timers | 94 -- Run callbacks of expired timers |
94 -- Return time until next timeout | 95 -- Return time until next timeout |
95 local function runtimers(next_delay, min_wait) | 96 local function runtimers(next_delay, min_wait) |
96 -- Any timers at all? | 97 -- Any timers at all? |
97 if not timers[1] then | 98 local now = gettime(); |
99 local peek = timers:peek(); | |
100 while peek do | |
101 | |
102 if peek > now then | |
103 next_delay = peek - now; | |
104 break; | |
105 end | |
106 | |
107 local _, timer, id = timers:pop(); | |
108 local ok, ret = pcall(timer[2], now); | |
109 if ok and type(ret) == "number" then | |
110 local next_time = now+ret; | |
111 timer[1] = next_time; | |
112 timers:insert(timer, next_time); | |
113 end | |
114 | |
115 peek = timers:peek(); | |
116 end | |
117 if peek == nil then | |
98 return next_delay; | 118 return next_delay; |
99 end | 119 end |
100 | 120 |
101 if resort_timers then | 121 if next_delay < min_wait then |
102 -- Sort earliest timers to the end | 122 return min_wait; |
103 t_sort(timers, function (a, b) return a[1] > b[1]; end); | 123 end |
104 resort_timers = false; | |
105 end | |
106 | |
107 -- Iterate from the end and remove completed timers | |
108 for i = #timers, 1, -1 do | |
109 local timer = timers[i]; | |
110 local t, f = timer[1], timer[2]; | |
111 -- Get time for every iteration to increase accuracy | |
112 local now = gettime(); | |
113 if t > now then | |
114 -- This timer should not fire yet | |
115 local diff = t - now; | |
116 if diff < next_delay then | |
117 next_delay = diff; | |
118 end | |
119 break; | |
120 end | |
121 local new_timeout = f(now); | |
122 if new_timeout then | |
123 -- Schedule for 'delay' from the time actually scheduled, | |
124 -- not from now, in order to prevent timer drift. | |
125 timer[1] = t + new_timeout; | |
126 resort_timers = true; | |
127 else | |
128 t_remove(timers, i); | |
129 end | |
130 end | |
131 | |
132 if resort_timers or next_delay < min_wait then | |
133 -- Timers may be added from within a timer callback. | |
134 -- Those would not be considered for next_delay, | |
135 -- and we might sleep for too long, so instead | |
136 -- we return a shorter timeout so we can | |
137 -- properly sort all new timers. | |
138 next_delay = min_wait; | |
139 end | |
140 | |
141 return next_delay; | 124 return next_delay; |
142 end | 125 end |
143 | 126 |
144 -- Socket handler interface | 127 -- Socket handler interface |
145 | 128 |
241 end | 224 end |
242 return | 225 return |
243 end | 226 end |
244 t = t or cfg.read_timeout; | 227 t = t or cfg.read_timeout; |
245 if self._readtimeout then | 228 if self._readtimeout then |
246 self._readtimeout[1] = gettime() + t; | 229 self._readtimeout:reschedule(gettime() + t); |
247 resort_timers = true; | |
248 else | 230 else |
249 self._readtimeout = addtimer(t, function () | 231 self._readtimeout = addtimer(t, function () |
250 if self:on("readtimeout") then | 232 if self:on("readtimeout") then |
251 return cfg.read_timeout; | 233 return cfg.read_timeout; |
252 else | 234 else |
266 end | 248 end |
267 return | 249 return |
268 end | 250 end |
269 t = t or cfg.send_timeout; | 251 t = t or cfg.send_timeout; |
270 if self._writetimeout then | 252 if self._writetimeout then |
271 self._writetimeout[1] = gettime() + t; | 253 self._writetimeout:reschedule(gettime() + t); |
272 resort_timers = true; | |
273 else | 254 else |
274 self._writetimeout = addtimer(t, function () | 255 self._writetimeout = addtimer(t, function () |
275 self:on("disconnect", "write timeout"); | 256 self:on("disconnect", "write timeout"); |
276 self:destroy(); | 257 self:destroy(); |
277 end); | 258 end); |