Software /
code /
prosody
Comparison
net/server_epoll.lua @ 7637:cccea9136b2d
net.server_epoll: More comments
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Sat, 27 Aug 2016 18:39:23 +0200 |
parent | 7630:abe2697b5e92 |
child | 7657:252823632401 |
comparison
equal
deleted
inserted
replaced
7636:7674cb520557 | 7637:cccea9136b2d |
---|---|
36 handshake_timeout = 60; | 36 handshake_timeout = 60; |
37 max_wait = 86400; | 37 max_wait = 86400; |
38 }; | 38 }; |
39 | 39 |
40 local fds = createtable(10, 0); -- FD -> conn | 40 local fds = createtable(10, 0); -- FD -> conn |
41 | |
42 -- Timer and scheduling -- | |
43 | |
41 local timers = {}; | 44 local timers = {}; |
42 | 45 |
43 local function noop() end | 46 local function noop() end |
44 local function closetimer(t) | 47 local function closetimer(t) |
45 t[1] = 0; | 48 t[1] = 0; |
90 end | 93 end |
91 break; | 94 break; |
92 end | 95 end |
93 local new_timeout = f(now); | 96 local new_timeout = f(now); |
94 if new_timeout then | 97 if new_timeout then |
95 -- Schedlue for 'delay' from the time actually sheduled, | 98 -- Schedule for 'delay' from the time actually scheduled, |
96 -- not from now, in order to prevent timer drift. | 99 -- not from now, in order to prevent timer drift. |
97 timer[1] = t + new_timeout; | 100 timer[1] = t + new_timeout; |
98 resort_timers = true; | 101 resort_timers = true; |
99 else | 102 else |
100 t_remove(timers, i); | 103 t_remove(timers, i); |
109 next_delay = 1e-6; | 112 next_delay = 1e-6; |
110 end | 113 end |
111 return next_delay; | 114 return next_delay; |
112 end | 115 end |
113 | 116 |
117 -- Socket handler interface | |
118 | |
114 local interface = {}; | 119 local interface = {}; |
115 local interface_mt = { __index = interface }; | 120 local interface_mt = { __index = interface }; |
116 | 121 |
117 function interface_mt:__tostring() | 122 function interface_mt:__tostring() |
118 if self.sockname and self.peername then | 123 if self.sockname and self.peername then |
121 return ("FD %d (%s, %d)"):format(self:getfd(), self.sockname or self.peername, self.sockport or self.peerport); | 126 return ("FD %d (%s, %d)"):format(self:getfd(), self.sockname or self.peername, self.sockport or self.peerport); |
122 end | 127 end |
123 return ("%s FD %d"):format(tostring(self.conn), self:getfd()); | 128 return ("%s FD %d"):format(tostring(self.conn), self:getfd()); |
124 end | 129 end |
125 | 130 |
131 -- Replace the listener and tell the old one | |
126 function interface:setlistener(listeners) | 132 function interface:setlistener(listeners) |
127 self:on("detach"); | 133 self:on("detach"); |
128 self.listeners = listeners; | 134 self.listeners = listeners; |
129 end | 135 end |
130 | 136 |
131 -- Call callback | 137 -- Call a listener callback |
132 function interface:on(what, ...) | 138 function interface:on(what, ...) |
133 local listener = self.listeners["on"..what]; | 139 local listener = self.listeners["on"..what]; |
134 if not listener then | 140 if not listener then |
135 -- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging | 141 -- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging |
136 return; | 142 return; |
140 log("error", "Error calling on%s: %s", what, err); | 146 log("error", "Error calling on%s: %s", what, err); |
141 end | 147 end |
142 return err; | 148 return err; |
143 end | 149 end |
144 | 150 |
151 -- Return the file descriptor number | |
145 function interface:getfd() | 152 function interface:getfd() |
146 if self.conn then | 153 if self.conn then |
147 return self.conn:getfd(); | 154 return self.conn:getfd(); |
148 end | 155 end |
149 return -1; | 156 return -1; |
150 end | 157 end |
151 | 158 |
159 -- Get IP address | |
152 function interface:ip() | 160 function interface:ip() |
153 return self.peername or self.sockname; | 161 return self.peername or self.sockname; |
154 end | 162 end |
155 | 163 |
164 -- Get a port number, doesn't matter which | |
156 function interface:port() | 165 function interface:port() |
157 return self.sockport or self.peerport; | 166 return self.sockport or self.peerport; |
158 end | 167 end |
159 | 168 |
169 -- Get local port number | |
160 function interface:clientport() | 170 function interface:clientport() |
161 return self.sockport; | 171 return self.sockport; |
162 end | 172 end |
163 | 173 |
174 -- Get remote port | |
164 function interface:serverport() | 175 function interface:serverport() |
165 if self.sockport then | 176 if self.sockport then |
166 return self.sockport; | 177 return self.sockport; |
167 elseif self.server then | 178 elseif self.server then |
168 self.server:port(); | 179 self.server:port(); |
169 end | 180 end |
170 end | 181 end |
171 | 182 |
183 -- Return underlying socket | |
172 function interface:socket() | 184 function interface:socket() |
173 return self.conn; | 185 return self.conn; |
174 end | 186 end |
175 | 187 |
176 function interface:setoption(k, v) | 188 function interface:setoption(k, v) |
178 if self.conn.setoption then | 190 if self.conn.setoption then |
179 self.conn:setoption(k, v); | 191 self.conn:setoption(k, v); |
180 end | 192 end |
181 end | 193 end |
182 | 194 |
195 -- Timeout for detecting dead or idle sockets | |
183 function interface:setreadtimeout(t) | 196 function interface:setreadtimeout(t) |
184 if t == false then | 197 if t == false then |
185 if self._readtimeout then | 198 if self._readtimeout then |
186 self._readtimeout:close(); | 199 self._readtimeout:close(); |
187 self._readtimeout = nil; | 200 self._readtimeout = nil; |
202 end | 215 end |
203 end); | 216 end); |
204 end | 217 end |
205 end | 218 end |
206 | 219 |
220 -- Timeout for detecting dead sockets | |
207 function interface:setwritetimeout(t) | 221 function interface:setwritetimeout(t) |
208 if t == false then | 222 if t == false then |
209 if self._writetimeout then | 223 if self._writetimeout then |
210 self._writetimeout:close(); | 224 self._writetimeout:close(); |
211 self._writetimeout = nil; | 225 self._writetimeout = nil; |
222 self:destroy(); | 236 self:destroy(); |
223 end); | 237 end); |
224 end | 238 end |
225 end | 239 end |
226 | 240 |
241 -- lua-epoll flag for currently requested poll state | |
227 function interface:flags() | 242 function interface:flags() |
228 if self._wantread then | 243 if self._wantread then |
229 if self._wantwrite then | 244 if self._wantwrite then |
230 return "rw"; | 245 return "rw"; |
231 end | 246 end |
233 elseif self._wantwrite then | 248 elseif self._wantwrite then |
234 return "w"; | 249 return "w"; |
235 end | 250 end |
236 end | 251 end |
237 | 252 |
253 -- Add or remove sockets or modify epoll flags | |
238 function interface:setflags(r, w) | 254 function interface:setflags(r, w) |
239 if r ~= nil then self._wantread = r; end | 255 if r ~= nil then self._wantread = r; end |
240 if w ~= nil then self._wantwrite = w; end | 256 if w ~= nil then self._wantwrite = w; end |
241 local flags = self:flags(); | 257 local flags = self:flags(); |
242 local currentflags = self._flags; | 258 local currentflags = self._flags; |
318 self:on("disconnect", err); | 334 self:on("disconnect", err); |
319 self:destroy(); | 335 self:destroy(); |
320 end | 336 end |
321 end | 337 end |
322 | 338 |
339 -- The write buffer has been successfully emptied | |
323 function interface:ondrain() | 340 function interface:ondrain() |
324 if self._toclose then | 341 if self._toclose then |
325 return self:close(); | 342 return self:close(); |
326 elseif self._starttls then | 343 elseif self._starttls then |
327 return self:starttls(); | 344 return self:starttls(); |
328 else | 345 else |
329 return self:on("drain"); | 346 return self:on("drain"); |
330 end | 347 end |
331 end | 348 end |
332 | 349 |
350 -- Add data to write buffer and set flag for wanting to write | |
333 function interface:write(data) | 351 function interface:write(data) |
334 local buffer = self.writebuffer; | 352 local buffer = self.writebuffer; |
335 if buffer then | 353 if buffer then |
336 t_insert(buffer, data); | 354 t_insert(buffer, data); |
337 else | 355 else |
341 self:setflags(nil, true); | 359 self:setflags(nil, true); |
342 return #data; | 360 return #data; |
343 end | 361 end |
344 interface.send = interface.write; | 362 interface.send = interface.write; |
345 | 363 |
364 -- Close, possibly after writing is done | |
346 function interface:close() | 365 function interface:close() |
347 if self._wantwrite then | 366 if self._wantwrite then |
348 self:setflags(false, true); -- Flush final buffer contents | 367 self:setflags(false, true); -- Flush final buffer contents |
349 self.write, self.send = noop, noop; -- No more writing | 368 self.write, self.send = noop, noop; -- No more writing |
350 log("debug", "Close %s after writing", tostring(self)); | 369 log("debug", "Close %s after writing", tostring(self)); |
452 conn.sockname, conn.sockport = client:getsockname(); | 471 conn.sockname, conn.sockport = client:getsockname(); |
453 end | 472 end |
454 return conn; | 473 return conn; |
455 end | 474 end |
456 | 475 |
476 -- A server interface has new incoming connections waiting | |
477 -- This replaces the onreadable callback | |
457 function interface:onacceptable() | 478 function interface:onacceptable() |
458 local conn, err = self.conn:accept(); | 479 local conn, err = self.conn:accept(); |
459 if not conn then | 480 if not conn then |
460 log("debug", "Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval); | 481 log("debug", "Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval); |
461 self:pausefor(cfg.accept_retry_interval); | 482 self:pausefor(cfg.accept_retry_interval); |
464 local client = wrapsocket(conn, self, nil, self.listeners, self.tls); | 485 local client = wrapsocket(conn, self, nil, self.listeners, self.tls); |
465 log("debug", "New connection %s", tostring(client)); | 486 log("debug", "New connection %s", tostring(client)); |
466 client:init(); | 487 client:init(); |
467 end | 488 end |
468 | 489 |
490 -- Initialization | |
469 function interface:init() | 491 function interface:init() |
470 if self.tls and not self._tls then | 492 if self.tls and not self._tls then |
471 self._tls = false; -- This means we should call onconnect when TLS is up | 493 self._tls = false; -- This means we should call onconnect when TLS is up |
472 return self:starttls(); | 494 return self:starttls(); |
473 else | 495 else |
499 end | 521 end |
500 self:setflags(true); | 522 self:setflags(true); |
501 end); | 523 end); |
502 end | 524 end |
503 | 525 |
526 -- Connected! | |
504 function interface:onconnect() | 527 function interface:onconnect() |
505 self.onwriteable = nil; | 528 self.onwriteable = nil; |
506 self:on("connect"); | 529 self:on("connect"); |
507 self:setflags(true); | 530 self:setflags(true); |
508 return self:onwriteable(); | 531 return self:onwriteable(); |
568 self.send = new_send; | 591 self.send = new_send; |
569 end | 592 end |
570 | 593 |
571 local quitting = nil; | 594 local quitting = nil; |
572 | 595 |
596 -- Signal main loop about shutdown via above upvalue | |
573 local function setquitting() | 597 local function setquitting() |
574 quitting = "quitting"; | 598 quitting = "quitting"; |
575 end | 599 end |
576 | 600 |
601 -- Main loop | |
577 local function loop() | 602 local function loop() |
578 repeat | 603 repeat |
579 local t = runtimers(cfg.max_wait); | 604 local t = runtimers(cfg.max_wait); |
580 local fd, r, w = epoll.wait(t); | 605 local fd, r, w = epoll.wait(t); |
581 if fd then | 606 if fd then |