Comparison

net/server_epoll.lua @ 7583:0498daffa6f0

net.server_epoll: Call callbacks through common method in protected mode
author Kim Alvefur <zash@zash.se>
date Thu, 18 Aug 2016 15:50:18 +0200
parent 7574:97b8506118a9
child 7584:98ee3ed105cf
comparison
equal deleted inserted replaced
7574:97b8506118a9 7583:0498daffa6f0
12 local t_insert = table.insert; 12 local t_insert = table.insert;
13 local t_remove = table.remove; 13 local t_remove = table.remove;
14 local t_concat = table.concat; 14 local t_concat = table.concat;
15 local setmetatable = setmetatable; 15 local setmetatable = setmetatable;
16 local tostring = tostring; 16 local tostring = tostring;
17 local pcall = pcall;
17 local log = require "util.logger".init("server_epoll"); 18 local log = require "util.logger".init("server_epoll");
18 local epoll = require "epoll"; 19 local epoll = require "epoll";
19 local socket = require "socket"; 20 local socket = require "socket";
20 local luasec = require "ssl"; 21 local luasec = require "ssl";
21 local gettime = require "util.time".now; 22 local gettime = require "util.time".now;
125 126
126 function interface:setlistener(listeners) 127 function interface:setlistener(listeners)
127 self.listeners = listeners; 128 self.listeners = listeners;
128 end 129 end
129 130
131 -- Call callback
132 function interface:on(what, ...)
133 local listener = self.listeners["on"..what];
134 if not listener then
135 -- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging
136 return;
137 end
138 local ok, err = pcall(listener, self, ...);
139 if not ok then
140 log("error", "Error calling on%s: %s", what, err);
141 end
142 return err;
143 end
144
130 function interface:getfd() 145 function interface:getfd()
131 return self.conn:getfd(); 146 return self.conn:getfd();
132 end 147 end
133 148
134 function interface:ip() 149 function interface:ip()
158 if self._readtimeout then 173 if self._readtimeout then
159 self._readtimeout[1] = gettime() + t; 174 self._readtimeout[1] = gettime() + t;
160 resort_timers = true; 175 resort_timers = true;
161 else 176 else
162 self._readtimeout = addtimer(t, function () 177 self._readtimeout = addtimer(t, function ()
163 if self:onreadtimeout() then 178 if self:on("readtimeout") then
164 return cfg.read_timeout; 179 return cfg.read_timeout;
165 else 180 else
166 self:ondisconnect("read timeout"); 181 self:on("disconnect", "read timeout");
167 self:destroy(); 182 self:destroy();
168 end 183 end
169 end); 184 end);
170 end
171 end
172
173 function interface:onreadtimeout()
174 if self.listeners.onreadtimeout then
175 return self.listeners.onreadtimeout(self);
176 end 185 end
177 end 186 end
178 187
179 function interface:setwritetimeout(t) 188 function interface:setwritetimeout(t)
180 if t == false then 189 if t == false then
188 if self._writetimeout then 197 if self._writetimeout then
189 self._writetimeout[1] = gettime() + t; 198 self._writetimeout[1] = gettime() + t;
190 resort_timers = true; 199 resort_timers = true;
191 else 200 else
192 self._writetimeout = addtimer(t, function () 201 self._writetimeout = addtimer(t, function ()
193 self.listeners.ondisconnect(self, "write timeout"); 202 self:on("disconnect", "write timeout");
194 self:destroy(); 203 self:destroy();
195 end); 204 end);
196 end 205 end
197 end 206 end
198 207
232 241
233 -- Called when socket is readable 242 -- Called when socket is readable
234 function interface:onreadable() 243 function interface:onreadable()
235 local data, err, partial = self.conn:receive(self._pattern); 244 local data, err, partial = self.conn:receive(self._pattern);
236 if data or partial then 245 if data or partial then
237 self.listeners.onincoming(self, data or partial, err); 246 self:on("incoming", data or partial, err);
238 end 247 end
239 if err == "wantread" then 248 if err == "wantread" then
240 self:setflags(true, nil); 249 self:setflags(true, nil);
241 elseif err == "wantwrite" then 250 elseif err == "wantwrite" then
242 self:setflags(nil, true); 251 self:setflags(nil, true);
243 elseif not data and err ~= "timeout" then 252 elseif not data and err ~= "timeout" then
244 self.listeners.ondisconnect(self, err); 253 self:on("disconnect", err);
245 self:destroy() 254 self:destroy()
246 return; 255 return;
247 end 256 end
248 self:setreadtimeout(); 257 self:setreadtimeout();
249 if self.conn:dirty() then 258 if self.conn:dirty() then
273 if err == "wantwrite" or err == "timeout" then 282 if err == "wantwrite" or err == "timeout" then
274 self:setflags(nil, true); 283 self:setflags(nil, true);
275 elseif err == "wantread" then 284 elseif err == "wantread" then
276 self:setflags(true, nil); 285 self:setflags(true, nil);
277 elseif err and err ~= "timeout" then 286 elseif err and err ~= "timeout" then
278 self.listeners.ondisconnect(self, err); 287 self:on("disconnect", err);
279 self:destroy(); 288 self:destroy();
280 end 289 end
281 end 290 end
282 291
283 function interface:ondrain() 292 function interface:ondrain()
284 if self.listeners.ondrain then 293 self:on("drain");
285 self.listeners.ondrain(self);
286 end
287 if self._starttls then 294 if self._starttls then
288 self:starttls(); 295 self:starttls();
289 elseif self._toclose then 296 elseif self._toclose then
290 self:close(); 297 self:close();
291 end 298 end
309 log("debug", "Close %s after writing", tostring(self)); 316 log("debug", "Close %s after writing", tostring(self));
310 self._toclose = true; 317 self._toclose = true;
311 else 318 else
312 log("debug", "Close %s", tostring(self)); 319 log("debug", "Close %s", tostring(self));
313 self.close = noop; 320 self.close = noop;
314 self.listeners.ondisconnect(self); 321 self:on("disconnect");
315 self:destroy(); 322 self:destroy();
316 end 323 end
317 end 324 end
318 325
319 function interface:destroy() 326 function interface:destroy()
334 self._starttls = true; 341 self._starttls = true;
335 else 342 else
336 self:setflags(false, false); 343 self:setflags(false, false);
337 local conn, err = luasec.wrap(self.conn, ctx or self.tls); 344 local conn, err = luasec.wrap(self.conn, ctx or self.tls);
338 if not conn then 345 if not conn then
339 self:ondisconnect(err); 346 self:on("disconnect", err);
340 self:destroy(); 347 self:destroy();
341 end 348 end
342 conn:settimeout(0); 349 conn:settimeout(0);
343 self.conn = conn; 350 self.conn = conn;
344 self._starttls = nil; 351 self._starttls = nil;
356 self:setflags(true, true); 363 self:setflags(true, true);
357 local old = self._tls; 364 local old = self._tls;
358 self._tls = true; 365 self._tls = true;
359 self.starttls = false; 366 self.starttls = false;
360 if old == false then 367 if old == false then
361 self:onconnect(); 368 self:on("connect");
362 elseif self.listeners.onstatus then 369 else
363 self.listeners.onstatus(self, "ssl-handshake-complete"); 370 self:on("status", "ssl-handshake-complete");
364 end 371 end
365 elseif err == "wantread" then 372 elseif err == "wantread" then
366 self:setflags(true, false); 373 self:setflags(true, false);
367 self:setwritetimeout(false); 374 self:setwritetimeout(false);
368 self:setreadtimeout(cfg.handshake_timeout); 375 self:setreadtimeout(cfg.handshake_timeout);
369 elseif err == "wantwrite" then 376 elseif err == "wantwrite" then
370 self:setflags(false, true); 377 self:setflags(false, true);
371 self:setreadtimeout(false); 378 self:setreadtimeout(false);
372 self:setwritetimeout(cfg.handshake_timeout); 379 self:setwritetimeout(cfg.handshake_timeout);
373 else 380 else
374 self:ondisconnect(err); 381 self:on("disconnect", err);
375 self:destroy(); 382 self:destroy();
376 end 383 end
377 end 384 end
378 385
379 local function wrapsocket(client, server, pattern, listeners, tls) -- luasocket object -> interface object 386 local function wrapsocket(client, server, pattern, listeners, tls) -- luasocket object -> interface object
434 self:onreadable(); 441 self:onreadable();
435 end 442 end
436 end); 443 end);
437 end 444 end
438 445
439 function interface:ondisconnect(err)
440 if self.listeners.ondisconnect then
441 self.listeners.ondisconnect(self, err);
442 end
443 end
444
445 function interface:onconnect() 446 function interface:onconnect()
446 self.onwriteable = nil; 447 self.onwriteable = nil;
447 if self.listeners.onconnect then 448 self:on("connect");
448 self.listeners.onconnect(self);
449 end
450 self:setflags(true); 449 self:setflags(true);
451 return self:onwriteable(); 450 return self:onwriteable();
452 end 451 end
453 452
454 local function addserver(addr, port, listeners, pattern, tls) 453 local function addserver(addr, port, listeners, pattern, tls)