Software /
code /
prosody
Comparison
net/server_epoll.lua @ 7583:0498daffa6f0
net.server_epoll: Call callbacks through common method in protected mode
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Thu, 18 Aug 2016 15:50:18 +0200 |
parent | 7574:97b8506118a9 |
child | 7584:98ee3ed105cf |
comparison
equal
deleted
inserted
replaced
7574:97b8506118a9 | 7583:0498daffa6f0 |
---|---|
12 local t_insert = table.insert; | 12 local t_insert = table.insert; |
13 local t_remove = table.remove; | 13 local t_remove = table.remove; |
14 local t_concat = table.concat; | 14 local t_concat = table.concat; |
15 local setmetatable = setmetatable; | 15 local setmetatable = setmetatable; |
16 local tostring = tostring; | 16 local tostring = tostring; |
17 local pcall = pcall; | |
17 local log = require "util.logger".init("server_epoll"); | 18 local log = require "util.logger".init("server_epoll"); |
18 local epoll = require "epoll"; | 19 local epoll = require "epoll"; |
19 local socket = require "socket"; | 20 local socket = require "socket"; |
20 local luasec = require "ssl"; | 21 local luasec = require "ssl"; |
21 local gettime = require "util.time".now; | 22 local gettime = require "util.time".now; |
125 | 126 |
126 function interface:setlistener(listeners) | 127 function interface:setlistener(listeners) |
127 self.listeners = listeners; | 128 self.listeners = listeners; |
128 end | 129 end |
129 | 130 |
131 -- Call callback | |
132 function interface:on(what, ...) | |
133 local listener = self.listeners["on"..what]; | |
134 if not listener then | |
135 -- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging | |
136 return; | |
137 end | |
138 local ok, err = pcall(listener, self, ...); | |
139 if not ok then | |
140 log("error", "Error calling on%s: %s", what, err); | |
141 end | |
142 return err; | |
143 end | |
144 | |
130 function interface:getfd() | 145 function interface:getfd() |
131 return self.conn:getfd(); | 146 return self.conn:getfd(); |
132 end | 147 end |
133 | 148 |
134 function interface:ip() | 149 function interface:ip() |
158 if self._readtimeout then | 173 if self._readtimeout then |
159 self._readtimeout[1] = gettime() + t; | 174 self._readtimeout[1] = gettime() + t; |
160 resort_timers = true; | 175 resort_timers = true; |
161 else | 176 else |
162 self._readtimeout = addtimer(t, function () | 177 self._readtimeout = addtimer(t, function () |
163 if self:onreadtimeout() then | 178 if self:on("readtimeout") then |
164 return cfg.read_timeout; | 179 return cfg.read_timeout; |
165 else | 180 else |
166 self:ondisconnect("read timeout"); | 181 self:on("disconnect", "read timeout"); |
167 self:destroy(); | 182 self:destroy(); |
168 end | 183 end |
169 end); | 184 end); |
170 end | |
171 end | |
172 | |
173 function interface:onreadtimeout() | |
174 if self.listeners.onreadtimeout then | |
175 return self.listeners.onreadtimeout(self); | |
176 end | 185 end |
177 end | 186 end |
178 | 187 |
179 function interface:setwritetimeout(t) | 188 function interface:setwritetimeout(t) |
180 if t == false then | 189 if t == false then |
188 if self._writetimeout then | 197 if self._writetimeout then |
189 self._writetimeout[1] = gettime() + t; | 198 self._writetimeout[1] = gettime() + t; |
190 resort_timers = true; | 199 resort_timers = true; |
191 else | 200 else |
192 self._writetimeout = addtimer(t, function () | 201 self._writetimeout = addtimer(t, function () |
193 self.listeners.ondisconnect(self, "write timeout"); | 202 self:on("disconnect", "write timeout"); |
194 self:destroy(); | 203 self:destroy(); |
195 end); | 204 end); |
196 end | 205 end |
197 end | 206 end |
198 | 207 |
232 | 241 |
233 -- Called when socket is readable | 242 -- Called when socket is readable |
234 function interface:onreadable() | 243 function interface:onreadable() |
235 local data, err, partial = self.conn:receive(self._pattern); | 244 local data, err, partial = self.conn:receive(self._pattern); |
236 if data or partial then | 245 if data or partial then |
237 self.listeners.onincoming(self, data or partial, err); | 246 self:on("incoming", data or partial, err); |
238 end | 247 end |
239 if err == "wantread" then | 248 if err == "wantread" then |
240 self:setflags(true, nil); | 249 self:setflags(true, nil); |
241 elseif err == "wantwrite" then | 250 elseif err == "wantwrite" then |
242 self:setflags(nil, true); | 251 self:setflags(nil, true); |
243 elseif not data and err ~= "timeout" then | 252 elseif not data and err ~= "timeout" then |
244 self.listeners.ondisconnect(self, err); | 253 self:on("disconnect", err); |
245 self:destroy() | 254 self:destroy() |
246 return; | 255 return; |
247 end | 256 end |
248 self:setreadtimeout(); | 257 self:setreadtimeout(); |
249 if self.conn:dirty() then | 258 if self.conn:dirty() then |
273 if err == "wantwrite" or err == "timeout" then | 282 if err == "wantwrite" or err == "timeout" then |
274 self:setflags(nil, true); | 283 self:setflags(nil, true); |
275 elseif err == "wantread" then | 284 elseif err == "wantread" then |
276 self:setflags(true, nil); | 285 self:setflags(true, nil); |
277 elseif err and err ~= "timeout" then | 286 elseif err and err ~= "timeout" then |
278 self.listeners.ondisconnect(self, err); | 287 self:on("disconnect", err); |
279 self:destroy(); | 288 self:destroy(); |
280 end | 289 end |
281 end | 290 end |
282 | 291 |
283 function interface:ondrain() | 292 function interface:ondrain() |
284 if self.listeners.ondrain then | 293 self:on("drain"); |
285 self.listeners.ondrain(self); | |
286 end | |
287 if self._starttls then | 294 if self._starttls then |
288 self:starttls(); | 295 self:starttls(); |
289 elseif self._toclose then | 296 elseif self._toclose then |
290 self:close(); | 297 self:close(); |
291 end | 298 end |
309 log("debug", "Close %s after writing", tostring(self)); | 316 log("debug", "Close %s after writing", tostring(self)); |
310 self._toclose = true; | 317 self._toclose = true; |
311 else | 318 else |
312 log("debug", "Close %s", tostring(self)); | 319 log("debug", "Close %s", tostring(self)); |
313 self.close = noop; | 320 self.close = noop; |
314 self.listeners.ondisconnect(self); | 321 self:on("disconnect"); |
315 self:destroy(); | 322 self:destroy(); |
316 end | 323 end |
317 end | 324 end |
318 | 325 |
319 function interface:destroy() | 326 function interface:destroy() |
334 self._starttls = true; | 341 self._starttls = true; |
335 else | 342 else |
336 self:setflags(false, false); | 343 self:setflags(false, false); |
337 local conn, err = luasec.wrap(self.conn, ctx or self.tls); | 344 local conn, err = luasec.wrap(self.conn, ctx or self.tls); |
338 if not conn then | 345 if not conn then |
339 self:ondisconnect(err); | 346 self:on("disconnect", err); |
340 self:destroy(); | 347 self:destroy(); |
341 end | 348 end |
342 conn:settimeout(0); | 349 conn:settimeout(0); |
343 self.conn = conn; | 350 self.conn = conn; |
344 self._starttls = nil; | 351 self._starttls = nil; |
356 self:setflags(true, true); | 363 self:setflags(true, true); |
357 local old = self._tls; | 364 local old = self._tls; |
358 self._tls = true; | 365 self._tls = true; |
359 self.starttls = false; | 366 self.starttls = false; |
360 if old == false then | 367 if old == false then |
361 self:onconnect(); | 368 self:on("connect"); |
362 elseif self.listeners.onstatus then | 369 else |
363 self.listeners.onstatus(self, "ssl-handshake-complete"); | 370 self:on("status", "ssl-handshake-complete"); |
364 end | 371 end |
365 elseif err == "wantread" then | 372 elseif err == "wantread" then |
366 self:setflags(true, false); | 373 self:setflags(true, false); |
367 self:setwritetimeout(false); | 374 self:setwritetimeout(false); |
368 self:setreadtimeout(cfg.handshake_timeout); | 375 self:setreadtimeout(cfg.handshake_timeout); |
369 elseif err == "wantwrite" then | 376 elseif err == "wantwrite" then |
370 self:setflags(false, true); | 377 self:setflags(false, true); |
371 self:setreadtimeout(false); | 378 self:setreadtimeout(false); |
372 self:setwritetimeout(cfg.handshake_timeout); | 379 self:setwritetimeout(cfg.handshake_timeout); |
373 else | 380 else |
374 self:ondisconnect(err); | 381 self:on("disconnect", err); |
375 self:destroy(); | 382 self:destroy(); |
376 end | 383 end |
377 end | 384 end |
378 | 385 |
379 local function wrapsocket(client, server, pattern, listeners, tls) -- luasocket object -> interface object | 386 local function wrapsocket(client, server, pattern, listeners, tls) -- luasocket object -> interface object |
434 self:onreadable(); | 441 self:onreadable(); |
435 end | 442 end |
436 end); | 443 end); |
437 end | 444 end |
438 | 445 |
439 function interface:ondisconnect(err) | |
440 if self.listeners.ondisconnect then | |
441 self.listeners.ondisconnect(self, err); | |
442 end | |
443 end | |
444 | |
445 function interface:onconnect() | 446 function interface:onconnect() |
446 self.onwriteable = nil; | 447 self.onwriteable = nil; |
447 if self.listeners.onconnect then | 448 self:on("connect"); |
448 self.listeners.onconnect(self); | |
449 end | |
450 self:setflags(true); | 449 self:setflags(true); |
451 return self:onwriteable(); | 450 return self:onwriteable(); |
452 end | 451 end |
453 | 452 |
454 local function addserver(addr, port, listeners, pattern, tls) | 453 local function addserver(addr, port, listeners, pattern, tls) |