Comparison

net/server_epoll.lua @ 7637:cccea9136b2d

net.server_epoll: More comments
author Kim Alvefur <zash@zash.se>
date Sat, 27 Aug 2016 18:39:23 +0200
parent 7630:abe2697b5e92
child 7657:252823632401
comparison
equal deleted inserted replaced
7636:7674cb520557 7637:cccea9136b2d
36 handshake_timeout = 60; 36 handshake_timeout = 60;
37 max_wait = 86400; 37 max_wait = 86400;
38 }; 38 };
39 39
40 local fds = createtable(10, 0); -- FD -> conn 40 local fds = createtable(10, 0); -- FD -> conn
41
42 -- Timer and scheduling --
43
41 local timers = {}; 44 local timers = {};
42 45
43 local function noop() end 46 local function noop() end
44 local function closetimer(t) 47 local function closetimer(t)
45 t[1] = 0; 48 t[1] = 0;
90 end 93 end
91 break; 94 break;
92 end 95 end
93 local new_timeout = f(now); 96 local new_timeout = f(now);
94 if new_timeout then 97 if new_timeout then
95 -- Schedlue for 'delay' from the time actually sheduled, 98 -- Schedule for 'delay' from the time actually scheduled,
96 -- not from now, in order to prevent timer drift. 99 -- not from now, in order to prevent timer drift.
97 timer[1] = t + new_timeout; 100 timer[1] = t + new_timeout;
98 resort_timers = true; 101 resort_timers = true;
99 else 102 else
100 t_remove(timers, i); 103 t_remove(timers, i);
109 next_delay = 1e-6; 112 next_delay = 1e-6;
110 end 113 end
111 return next_delay; 114 return next_delay;
112 end 115 end
113 116
117 -- Socket handler interface
118
114 local interface = {}; 119 local interface = {};
115 local interface_mt = { __index = interface }; 120 local interface_mt = { __index = interface };
116 121
117 function interface_mt:__tostring() 122 function interface_mt:__tostring()
118 if self.sockname and self.peername then 123 if self.sockname and self.peername then
121 return ("FD %d (%s, %d)"):format(self:getfd(), self.sockname or self.peername, self.sockport or self.peerport); 126 return ("FD %d (%s, %d)"):format(self:getfd(), self.sockname or self.peername, self.sockport or self.peerport);
122 end 127 end
123 return ("%s FD %d"):format(tostring(self.conn), self:getfd()); 128 return ("%s FD %d"):format(tostring(self.conn), self:getfd());
124 end 129 end
125 130
131 -- Replace the listener and tell the old one
126 function interface:setlistener(listeners) 132 function interface:setlistener(listeners)
127 self:on("detach"); 133 self:on("detach");
128 self.listeners = listeners; 134 self.listeners = listeners;
129 end 135 end
130 136
131 -- Call callback 137 -- Call a listener callback
132 function interface:on(what, ...) 138 function interface:on(what, ...)
133 local listener = self.listeners["on"..what]; 139 local listener = self.listeners["on"..what];
134 if not listener then 140 if not listener then
135 -- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging 141 -- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging
136 return; 142 return;
140 log("error", "Error calling on%s: %s", what, err); 146 log("error", "Error calling on%s: %s", what, err);
141 end 147 end
142 return err; 148 return err;
143 end 149 end
144 150
151 -- Return the file descriptor number
145 function interface:getfd() 152 function interface:getfd()
146 if self.conn then 153 if self.conn then
147 return self.conn:getfd(); 154 return self.conn:getfd();
148 end 155 end
149 return -1; 156 return -1;
150 end 157 end
151 158
159 -- Get IP address
152 function interface:ip() 160 function interface:ip()
153 return self.peername or self.sockname; 161 return self.peername or self.sockname;
154 end 162 end
155 163
164 -- Get a port number, doesn't matter which
156 function interface:port() 165 function interface:port()
157 return self.sockport or self.peerport; 166 return self.sockport or self.peerport;
158 end 167 end
159 168
169 -- Get local port number
160 function interface:clientport() 170 function interface:clientport()
161 return self.sockport; 171 return self.sockport;
162 end 172 end
163 173
174 -- Get remote port
164 function interface:serverport() 175 function interface:serverport()
165 if self.sockport then 176 if self.sockport then
166 return self.sockport; 177 return self.sockport;
167 elseif self.server then 178 elseif self.server then
168 self.server:port(); 179 self.server:port();
169 end 180 end
170 end 181 end
171 182
183 -- Return underlying socket
172 function interface:socket() 184 function interface:socket()
173 return self.conn; 185 return self.conn;
174 end 186 end
175 187
176 function interface:setoption(k, v) 188 function interface:setoption(k, v)
178 if self.conn.setoption then 190 if self.conn.setoption then
179 self.conn:setoption(k, v); 191 self.conn:setoption(k, v);
180 end 192 end
181 end 193 end
182 194
195 -- Timeout for detecting dead or idle sockets
183 function interface:setreadtimeout(t) 196 function interface:setreadtimeout(t)
184 if t == false then 197 if t == false then
185 if self._readtimeout then 198 if self._readtimeout then
186 self._readtimeout:close(); 199 self._readtimeout:close();
187 self._readtimeout = nil; 200 self._readtimeout = nil;
202 end 215 end
203 end); 216 end);
204 end 217 end
205 end 218 end
206 219
220 -- Timeout for detecting dead sockets
207 function interface:setwritetimeout(t) 221 function interface:setwritetimeout(t)
208 if t == false then 222 if t == false then
209 if self._writetimeout then 223 if self._writetimeout then
210 self._writetimeout:close(); 224 self._writetimeout:close();
211 self._writetimeout = nil; 225 self._writetimeout = nil;
222 self:destroy(); 236 self:destroy();
223 end); 237 end);
224 end 238 end
225 end 239 end
226 240
241 -- lua-epoll flag for currently requested poll state
227 function interface:flags() 242 function interface:flags()
228 if self._wantread then 243 if self._wantread then
229 if self._wantwrite then 244 if self._wantwrite then
230 return "rw"; 245 return "rw";
231 end 246 end
233 elseif self._wantwrite then 248 elseif self._wantwrite then
234 return "w"; 249 return "w";
235 end 250 end
236 end 251 end
237 252
253 -- Add or remove sockets or modify epoll flags
238 function interface:setflags(r, w) 254 function interface:setflags(r, w)
239 if r ~= nil then self._wantread = r; end 255 if r ~= nil then self._wantread = r; end
240 if w ~= nil then self._wantwrite = w; end 256 if w ~= nil then self._wantwrite = w; end
241 local flags = self:flags(); 257 local flags = self:flags();
242 local currentflags = self._flags; 258 local currentflags = self._flags;
318 self:on("disconnect", err); 334 self:on("disconnect", err);
319 self:destroy(); 335 self:destroy();
320 end 336 end
321 end 337 end
322 338
339 -- The write buffer has been successfully emptied
323 function interface:ondrain() 340 function interface:ondrain()
324 if self._toclose then 341 if self._toclose then
325 return self:close(); 342 return self:close();
326 elseif self._starttls then 343 elseif self._starttls then
327 return self:starttls(); 344 return self:starttls();
328 else 345 else
329 return self:on("drain"); 346 return self:on("drain");
330 end 347 end
331 end 348 end
332 349
350 -- Add data to write buffer and set flag for wanting to write
333 function interface:write(data) 351 function interface:write(data)
334 local buffer = self.writebuffer; 352 local buffer = self.writebuffer;
335 if buffer then 353 if buffer then
336 t_insert(buffer, data); 354 t_insert(buffer, data);
337 else 355 else
341 self:setflags(nil, true); 359 self:setflags(nil, true);
342 return #data; 360 return #data;
343 end 361 end
344 interface.send = interface.write; 362 interface.send = interface.write;
345 363
364 -- Close, possibly after writing is done
346 function interface:close() 365 function interface:close()
347 if self._wantwrite then 366 if self._wantwrite then
348 self:setflags(false, true); -- Flush final buffer contents 367 self:setflags(false, true); -- Flush final buffer contents
349 self.write, self.send = noop, noop; -- No more writing 368 self.write, self.send = noop, noop; -- No more writing
350 log("debug", "Close %s after writing", tostring(self)); 369 log("debug", "Close %s after writing", tostring(self));
452 conn.sockname, conn.sockport = client:getsockname(); 471 conn.sockname, conn.sockport = client:getsockname();
453 end 472 end
454 return conn; 473 return conn;
455 end 474 end
456 475
476 -- A server interface has new incoming connections waiting
477 -- This replaces the onreadable callback
457 function interface:onacceptable() 478 function interface:onacceptable()
458 local conn, err = self.conn:accept(); 479 local conn, err = self.conn:accept();
459 if not conn then 480 if not conn then
460 log("debug", "Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval); 481 log("debug", "Error accepting new client: %s, server will be paused for %ds", err, cfg.accept_retry_interval);
461 self:pausefor(cfg.accept_retry_interval); 482 self:pausefor(cfg.accept_retry_interval);
464 local client = wrapsocket(conn, self, nil, self.listeners, self.tls); 485 local client = wrapsocket(conn, self, nil, self.listeners, self.tls);
465 log("debug", "New connection %s", tostring(client)); 486 log("debug", "New connection %s", tostring(client));
466 client:init(); 487 client:init();
467 end 488 end
468 489
490 -- Initialization
469 function interface:init() 491 function interface:init()
470 if self.tls and not self._tls then 492 if self.tls and not self._tls then
471 self._tls = false; -- This means we should call onconnect when TLS is up 493 self._tls = false; -- This means we should call onconnect when TLS is up
472 return self:starttls(); 494 return self:starttls();
473 else 495 else
499 end 521 end
500 self:setflags(true); 522 self:setflags(true);
501 end); 523 end);
502 end 524 end
503 525
526 -- Connected!
504 function interface:onconnect() 527 function interface:onconnect()
505 self.onwriteable = nil; 528 self.onwriteable = nil;
506 self:on("connect"); 529 self:on("connect");
507 self:setflags(true); 530 self:setflags(true);
508 return self:onwriteable(); 531 return self:onwriteable();
568 self.send = new_send; 591 self.send = new_send;
569 end 592 end
570 593
571 local quitting = nil; 594 local quitting = nil;
572 595
596 -- Signal main loop about shutdown via above upvalue
573 local function setquitting() 597 local function setquitting()
574 quitting = "quitting"; 598 quitting = "quitting";
575 end 599 end
576 600
601 -- Main loop
577 local function loop() 602 local function loop()
578 repeat 603 repeat
579 local t = runtimers(cfg.max_wait); 604 local t = runtimers(cfg.max_wait);
580 local fd, r, w = epoll.wait(t); 605 local fd, r, w = epoll.wait(t);
581 if fd then 606 if fd then