Software / code / prosody
Comparison
net/server_epoll.lua @ 7583:0498daffa6f0
net.server_epoll: Call callbacks through common method in protected mode
| author | Kim Alvefur <zash@zash.se> |
|---|---|
| date | Thu, 18 Aug 2016 15:50:18 +0200 |
| parent | 7574:97b8506118a9 |
| child | 7584:98ee3ed105cf |
comparison
equal
deleted
inserted
replaced
| 7574:97b8506118a9 | 7583:0498daffa6f0 |
|---|---|
| 12 local t_insert = table.insert; | 12 local t_insert = table.insert; |
| 13 local t_remove = table.remove; | 13 local t_remove = table.remove; |
| 14 local t_concat = table.concat; | 14 local t_concat = table.concat; |
| 15 local setmetatable = setmetatable; | 15 local setmetatable = setmetatable; |
| 16 local tostring = tostring; | 16 local tostring = tostring; |
| 17 local pcall = pcall; | |
| 17 local log = require "util.logger".init("server_epoll"); | 18 local log = require "util.logger".init("server_epoll"); |
| 18 local epoll = require "epoll"; | 19 local epoll = require "epoll"; |
| 19 local socket = require "socket"; | 20 local socket = require "socket"; |
| 20 local luasec = require "ssl"; | 21 local luasec = require "ssl"; |
| 21 local gettime = require "util.time".now; | 22 local gettime = require "util.time".now; |
| 125 | 126 |
| 126 function interface:setlistener(listeners) | 127 function interface:setlistener(listeners) |
| 127 self.listeners = listeners; | 128 self.listeners = listeners; |
| 128 end | 129 end |
| 129 | 130 |
| 131 -- Call callback | |
| 132 function interface:on(what, ...) | |
| 133 local listener = self.listeners["on"..what]; | |
| 134 if not listener then | |
| 135 -- log("debug", "Missing listener 'on%s'", what); -- uncomment for development and debugging | |
| 136 return; | |
| 137 end | |
| 138 local ok, err = pcall(listener, self, ...); | |
| 139 if not ok then | |
| 140 log("error", "Error calling on%s: %s", what, err); | |
| 141 end | |
| 142 return err; | |
| 143 end | |
| 144 | |
| 130 function interface:getfd() | 145 function interface:getfd() |
| 131 return self.conn:getfd(); | 146 return self.conn:getfd(); |
| 132 end | 147 end |
| 133 | 148 |
| 134 function interface:ip() | 149 function interface:ip() |
| 158 if self._readtimeout then | 173 if self._readtimeout then |
| 159 self._readtimeout[1] = gettime() + t; | 174 self._readtimeout[1] = gettime() + t; |
| 160 resort_timers = true; | 175 resort_timers = true; |
| 161 else | 176 else |
| 162 self._readtimeout = addtimer(t, function () | 177 self._readtimeout = addtimer(t, function () |
| 163 if self:onreadtimeout() then | 178 if self:on("readtimeout") then |
| 164 return cfg.read_timeout; | 179 return cfg.read_timeout; |
| 165 else | 180 else |
| 166 self:ondisconnect("read timeout"); | 181 self:on("disconnect", "read timeout"); |
| 167 self:destroy(); | 182 self:destroy(); |
| 168 end | 183 end |
| 169 end); | 184 end); |
| 170 end | |
| 171 end | |
| 172 | |
| 173 function interface:onreadtimeout() | |
| 174 if self.listeners.onreadtimeout then | |
| 175 return self.listeners.onreadtimeout(self); | |
| 176 end | 185 end |
| 177 end | 186 end |
| 178 | 187 |
| 179 function interface:setwritetimeout(t) | 188 function interface:setwritetimeout(t) |
| 180 if t == false then | 189 if t == false then |
| 188 if self._writetimeout then | 197 if self._writetimeout then |
| 189 self._writetimeout[1] = gettime() + t; | 198 self._writetimeout[1] = gettime() + t; |
| 190 resort_timers = true; | 199 resort_timers = true; |
| 191 else | 200 else |
| 192 self._writetimeout = addtimer(t, function () | 201 self._writetimeout = addtimer(t, function () |
| 193 self.listeners.ondisconnect(self, "write timeout"); | 202 self:on("disconnect", "write timeout"); |
| 194 self:destroy(); | 203 self:destroy(); |
| 195 end); | 204 end); |
| 196 end | 205 end |
| 197 end | 206 end |
| 198 | 207 |
| 232 | 241 |
| 233 -- Called when socket is readable | 242 -- Called when socket is readable |
| 234 function interface:onreadable() | 243 function interface:onreadable() |
| 235 local data, err, partial = self.conn:receive(self._pattern); | 244 local data, err, partial = self.conn:receive(self._pattern); |
| 236 if data or partial then | 245 if data or partial then |
| 237 self.listeners.onincoming(self, data or partial, err); | 246 self:on("incoming", data or partial, err); |
| 238 end | 247 end |
| 239 if err == "wantread" then | 248 if err == "wantread" then |
| 240 self:setflags(true, nil); | 249 self:setflags(true, nil); |
| 241 elseif err == "wantwrite" then | 250 elseif err == "wantwrite" then |
| 242 self:setflags(nil, true); | 251 self:setflags(nil, true); |
| 243 elseif not data and err ~= "timeout" then | 252 elseif not data and err ~= "timeout" then |
| 244 self.listeners.ondisconnect(self, err); | 253 self:on("disconnect", err); |
| 245 self:destroy() | 254 self:destroy() |
| 246 return; | 255 return; |
| 247 end | 256 end |
| 248 self:setreadtimeout(); | 257 self:setreadtimeout(); |
| 249 if self.conn:dirty() then | 258 if self.conn:dirty() then |
| 273 if err == "wantwrite" or err == "timeout" then | 282 if err == "wantwrite" or err == "timeout" then |
| 274 self:setflags(nil, true); | 283 self:setflags(nil, true); |
| 275 elseif err == "wantread" then | 284 elseif err == "wantread" then |
| 276 self:setflags(true, nil); | 285 self:setflags(true, nil); |
| 277 elseif err and err ~= "timeout" then | 286 elseif err and err ~= "timeout" then |
| 278 self.listeners.ondisconnect(self, err); | 287 self:on("disconnect", err); |
| 279 self:destroy(); | 288 self:destroy(); |
| 280 end | 289 end |
| 281 end | 290 end |
| 282 | 291 |
| 283 function interface:ondrain() | 292 function interface:ondrain() |
| 284 if self.listeners.ondrain then | 293 self:on("drain"); |
| 285 self.listeners.ondrain(self); | |
| 286 end | |
| 287 if self._starttls then | 294 if self._starttls then |
| 288 self:starttls(); | 295 self:starttls(); |
| 289 elseif self._toclose then | 296 elseif self._toclose then |
| 290 self:close(); | 297 self:close(); |
| 291 end | 298 end |
| 309 log("debug", "Close %s after writing", tostring(self)); | 316 log("debug", "Close %s after writing", tostring(self)); |
| 310 self._toclose = true; | 317 self._toclose = true; |
| 311 else | 318 else |
| 312 log("debug", "Close %s", tostring(self)); | 319 log("debug", "Close %s", tostring(self)); |
| 313 self.close = noop; | 320 self.close = noop; |
| 314 self.listeners.ondisconnect(self); | 321 self:on("disconnect"); |
| 315 self:destroy(); | 322 self:destroy(); |
| 316 end | 323 end |
| 317 end | 324 end |
| 318 | 325 |
| 319 function interface:destroy() | 326 function interface:destroy() |
| 334 self._starttls = true; | 341 self._starttls = true; |
| 335 else | 342 else |
| 336 self:setflags(false, false); | 343 self:setflags(false, false); |
| 337 local conn, err = luasec.wrap(self.conn, ctx or self.tls); | 344 local conn, err = luasec.wrap(self.conn, ctx or self.tls); |
| 338 if not conn then | 345 if not conn then |
| 339 self:ondisconnect(err); | 346 self:on("disconnect", err); |
| 340 self:destroy(); | 347 self:destroy(); |
| 341 end | 348 end |
| 342 conn:settimeout(0); | 349 conn:settimeout(0); |
| 343 self.conn = conn; | 350 self.conn = conn; |
| 344 self._starttls = nil; | 351 self._starttls = nil; |
| 356 self:setflags(true, true); | 363 self:setflags(true, true); |
| 357 local old = self._tls; | 364 local old = self._tls; |
| 358 self._tls = true; | 365 self._tls = true; |
| 359 self.starttls = false; | 366 self.starttls = false; |
| 360 if old == false then | 367 if old == false then |
| 361 self:onconnect(); | 368 self:on("connect"); |
| 362 elseif self.listeners.onstatus then | 369 else |
| 363 self.listeners.onstatus(self, "ssl-handshake-complete"); | 370 self:on("status", "ssl-handshake-complete"); |
| 364 end | 371 end |
| 365 elseif err == "wantread" then | 372 elseif err == "wantread" then |
| 366 self:setflags(true, false); | 373 self:setflags(true, false); |
| 367 self:setwritetimeout(false); | 374 self:setwritetimeout(false); |
| 368 self:setreadtimeout(cfg.handshake_timeout); | 375 self:setreadtimeout(cfg.handshake_timeout); |
| 369 elseif err == "wantwrite" then | 376 elseif err == "wantwrite" then |
| 370 self:setflags(false, true); | 377 self:setflags(false, true); |
| 371 self:setreadtimeout(false); | 378 self:setreadtimeout(false); |
| 372 self:setwritetimeout(cfg.handshake_timeout); | 379 self:setwritetimeout(cfg.handshake_timeout); |
| 373 else | 380 else |
| 374 self:ondisconnect(err); | 381 self:on("disconnect", err); |
| 375 self:destroy(); | 382 self:destroy(); |
| 376 end | 383 end |
| 377 end | 384 end |
| 378 | 385 |
| 379 local function wrapsocket(client, server, pattern, listeners, tls) -- luasocket object -> interface object | 386 local function wrapsocket(client, server, pattern, listeners, tls) -- luasocket object -> interface object |
| 434 self:onreadable(); | 441 self:onreadable(); |
| 435 end | 442 end |
| 436 end); | 443 end); |
| 437 end | 444 end |
| 438 | 445 |
| 439 function interface:ondisconnect(err) | |
| 440 if self.listeners.ondisconnect then | |
| 441 self.listeners.ondisconnect(self, err); | |
| 442 end | |
| 443 end | |
| 444 | |
| 445 function interface:onconnect() | 446 function interface:onconnect() |
| 446 self.onwriteable = nil; | 447 self.onwriteable = nil; |
| 447 if self.listeners.onconnect then | 448 self:on("connect"); |
| 448 self.listeners.onconnect(self); | |
| 449 end | |
| 450 self:setflags(true); | 449 self:setflags(true); |
| 451 return self:onwriteable(); | 450 return self:onwriteable(); |
| 452 end | 451 end |
| 453 | 452 |
| 454 local function addserver(addr, port, listeners, pattern, tls) | 453 local function addserver(addr, port, listeners, pattern, tls) |