Software / code / prosody
Comparison
net/server_event.lua @ 6851:1f1bed8ebc41
server_event: Remove needless scoping and indentation
| author | Kim Alvefur <zash@zash.se> |
|---|---|
| date | Fri, 25 Sep 2015 17:12:55 +0200 |
| parent | 6850:41de00647ad3 |
| child | 6852:626d8152b1ad |
comparison
equal
deleted
inserted
replaced
| 6850:41de00647ad3 | 6851:1f1bed8ebc41 |
|---|---|
| 87 local EV_READWRITE = bitor( EV_READ, EV_WRITE ) | 87 local EV_READWRITE = bitor( EV_READ, EV_WRITE ) |
| 88 | 88 |
| 89 local interfacelist = { } | 89 local interfacelist = { } |
| 90 | 90 |
| 91 -- Client interface methods | 91 -- Client interface methods |
| 92 local interface_mt | 92 local interface_mt = {}; interface_mt.__index = interface_mt; |
| 93 do | 93 |
| 94 interface_mt = {}; interface_mt.__index = interface_mt; | 94 -- Private methods |
| 95 | 95 function interface_mt:_close() |
| 96 | 96 return self:_destroy(); |
| 97 -- Private methods | 97 end |
| 98 function interface_mt:_close() | 98 |
| 99 return self:_destroy(); | 99 function interface_mt:_start_connection(plainssl) -- should be called from addclient |
| 100 end | 100 local callback = function( event ) |
| 101 | 101 if EV_TIMEOUT == event then -- timeout during connection |
| 102 function interface_mt:_start_connection(plainssl) -- should be called from addclient | 102 self.fatalerror = "connection timeout" |
| 103 local callback = function( event ) | 103 self:ontimeout() -- call timeout listener |
| 104 if EV_TIMEOUT == event then -- timeout during connection | 104 self:_close() |
| 105 self.fatalerror = "connection timeout" | 105 debug( "new connection failed. id:", self.id, "error:", self.fatalerror ) |
| 106 self:ontimeout() -- call timeout listener | 106 else |
| 107 self:_close() | 107 if plainssl and has_luasec then -- start ssl session |
| 108 debug( "new connection failed. id:", self.id, "error:", self.fatalerror ) | 108 self:starttls(self._sslctx, true) |
| 109 else | 109 else -- normal connection |
| 110 if plainssl and has_luasec then -- start ssl session | 110 self:_start_session(true) |
| 111 self:starttls(self._sslctx, true) | |
| 112 else -- normal connection | |
| 113 self:_start_session(true) | |
| 114 end | |
| 115 debug( "new connection established. id:", self.id ) | |
| 116 end | 111 end |
| 117 self.eventconnect = nil | 112 debug( "new connection established. id:", self.id ) |
| 118 return -1 | 113 end |
| 119 end | 114 self.eventconnect = nil |
| 120 self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT ) | 115 return -1 |
| 121 return true | 116 end |
| 122 end | 117 self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT ) |
| 123 function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl | |
| 124 if self.type == "client" then | |
| 125 local callback = function( ) | |
| 126 self:_lock( false, false, false ) | |
| 127 --vdebug( "start listening on client socket with id:", self.id ) | |
| 128 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback | |
| 129 if call_onconnect then | |
| 130 self:onconnect() | |
| 131 end | |
| 132 self.eventsession = nil | |
| 133 return -1 | |
| 134 end | |
| 135 self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 ) | |
| 136 else | |
| 137 self:_lock( false ) | |
| 138 --vdebug( "start listening on server socket with id:", self.id ) | |
| 139 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback | |
| 140 end | |
| 141 return true | 118 return true |
| 142 end | 119 end |
| 143 function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first | 120 function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl |
| 144 --vdebug( "starting ssl session with client id:", self.id ) | 121 if self.type == "client" then |
| 145 local _ | 122 local callback = function( ) |
| 146 _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! | 123 self:_lock( false, false, false ) |
| 147 _ = self.eventwrite and self.eventwrite:close( ) | 124 --vdebug( "start listening on client socket with id:", self.id ) |
| 148 self.eventread, self.eventwrite = nil, nil | 125 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback |
| 149 local err | 126 if call_onconnect then |
| 150 self.conn, err = ssl.wrap( self.conn, self._sslctx ) | 127 self:onconnect() |
| 151 if err then | 128 end |
| 152 self.fatalerror = err | 129 self.eventsession = nil |
| 153 self.conn = nil -- cannot be used anymore | 130 return -1 |
| 154 if call_onconnect then | 131 end |
| 155 self.ondisconnect = nil -- dont call this when client isnt really connected | 132 self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 ) |
| 156 end | 133 else |
| 157 self:_close() | 134 self:_lock( false ) |
| 158 debug( "fatal error while ssl wrapping:", err ) | 135 --vdebug( "start listening on server socket with id:", self.id ) |
| 159 return false | 136 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback |
| 160 end | 137 end |
| 161 self.conn:settimeout( 0 ) -- set non blocking | 138 return true |
| 162 local handshakecallback = coroutine_wrap( | 139 end |
| 163 function( event ) | 140 function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first |
| 164 local _, err | 141 --vdebug( "starting ssl session with client id:", self.id ) |
| 165 local attempt = 0 | 142 local _ |
| 166 local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS | 143 _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! |
| 167 while attempt < maxattempt do -- no endless loop | 144 _ = self.eventwrite and self.eventwrite:close( ) |
| 168 attempt = attempt + 1 | 145 self.eventread, self.eventwrite = nil, nil |
| 169 debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt ) | 146 local err |
| 170 if attempt > maxattempt then | 147 self.conn, err = ssl.wrap( self.conn, self._sslctx ) |
| 171 self.fatalerror = "max handshake attempts exceeded" | 148 if err then |
| 172 elseif EV_TIMEOUT == event then | 149 self.fatalerror = err |
| 173 self.fatalerror = "timeout during handshake" | 150 self.conn = nil -- cannot be used anymore |
| 174 else | 151 if call_onconnect then |
| 175 _, err = self.conn:dohandshake( ) | 152 self.ondisconnect = nil -- dont call this when client isnt really connected |
| 176 if not err then | 153 end |
| 177 self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed | 154 self:_close() |
| 178 self.send = self.conn.send -- caching table lookups with new client object | 155 debug( "fatal error while ssl wrapping:", err ) |
| 179 self.receive = self.conn.receive | 156 return false |
| 180 if not call_onconnect then -- trigger listener | 157 end |
| 181 self:onstatus("ssl-handshake-complete"); | 158 self.conn:settimeout( 0 ) -- set non blocking |
| 182 end | 159 local handshakecallback = coroutine_wrap( |
| 183 self:_start_session( call_onconnect ) | 160 function( event ) |
| 184 debug( "ssl handshake done" ) | 161 local _, err |
| 185 self.eventhandshake = nil | 162 local attempt = 0 |
| 186 return -1 | 163 local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS |
| 164 while attempt < maxattempt do -- no endless loop | |
| 165 attempt = attempt + 1 | |
| 166 debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt ) | |
| 167 if attempt > maxattempt then | |
| 168 self.fatalerror = "max handshake attempts exceeded" | |
| 169 elseif EV_TIMEOUT == event then | |
| 170 self.fatalerror = "timeout during handshake" | |
| 171 else | |
| 172 _, err = self.conn:dohandshake( ) | |
| 173 if not err then | |
| 174 self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed | |
| 175 self.send = self.conn.send -- caching table lookups with new client object | |
| 176 self.receive = self.conn.receive | |
| 177 if not call_onconnect then -- trigger listener | |
| 178 self:onstatus("ssl-handshake-complete"); | |
| 187 end | 179 end |
| 188 if err == "wantwrite" then | 180 self:_start_session( call_onconnect ) |
| 189 event = EV_WRITE | 181 debug( "ssl handshake done" ) |
| 190 elseif err == "wantread" then | |
| 191 event = EV_READ | |
| 192 else | |
| 193 debug( "ssl handshake error:", err ) | |
| 194 self.fatalerror = err | |
| 195 end | |
| 196 end | |
| 197 if self.fatalerror then | |
| 198 if call_onconnect then | |
| 199 self.ondisconnect = nil -- dont call this when client isnt really connected | |
| 200 end | |
| 201 self:_close() | |
| 202 debug( "handshake failed because:", self.fatalerror ) | |
| 203 self.eventhandshake = nil | 182 self.eventhandshake = nil |
| 204 return -1 | 183 return -1 |
| 205 end | 184 end |
| 206 event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster... | 185 if err == "wantwrite" then |
| 186 event = EV_WRITE | |
| 187 elseif err == "wantread" then | |
| 188 event = EV_READ | |
| 189 else | |
| 190 debug( "ssl handshake error:", err ) | |
| 191 self.fatalerror = err | |
| 192 end | |
| 207 end | 193 end |
| 208 end | 194 if self.fatalerror then |
| 209 ) | 195 if call_onconnect then |
| 210 debug "starting handshake..." | 196 self.ondisconnect = nil -- dont call this when client isnt really connected |
| 211 self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked | |
| 212 self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT ) | |
| 213 return true | |
| 214 end | |
| 215 function interface_mt:_destroy() -- close this interface + events and call last listener | |
| 216 debug( "closing client with id:", self.id, self.fatalerror ) | |
| 217 self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions | |
| 218 local _ | |
| 219 _ = self.eventread and self.eventread:close( ) | |
| 220 if self.type == "client" then | |
| 221 _ = self.eventwrite and self.eventwrite:close( ) | |
| 222 _ = self.eventhandshake and self.eventhandshake:close( ) | |
| 223 _ = self.eventstarthandshake and self.eventstarthandshake:close( ) | |
| 224 _ = self.eventconnect and self.eventconnect:close( ) | |
| 225 _ = self.eventsession and self.eventsession:close( ) | |
| 226 _ = self.eventwritetimeout and self.eventwritetimeout:close( ) | |
| 227 _ = self.eventreadtimeout and self.eventreadtimeout:close( ) | |
| 228 _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect) | |
| 229 _ = self.conn and self.conn:close( ) -- close connection | |
| 230 _ = self._server and self._server:counter(-1); | |
| 231 self.eventread, self.eventwrite = nil, nil | |
| 232 self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil | |
| 233 self.readcallback, self.writecallback = nil, nil | |
| 234 else | |
| 235 self.conn:close( ) | |
| 236 self.eventread, self.eventclose = nil, nil | |
| 237 self.interface, self.readcallback = nil, nil | |
| 238 end | |
| 239 interfacelist[ self ] = nil | |
| 240 return true | |
| 241 end | |
| 242 | |
| 243 function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events | |
| 244 self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting | |
| 245 return nointerface, noreading, nowriting | |
| 246 end | |
| 247 | |
| 248 --TODO: Deprecate | |
| 249 function interface_mt:lock_read(switch) | |
| 250 if switch then | |
| 251 return self:pause(); | |
| 252 else | |
| 253 return self:resume(); | |
| 254 end | |
| 255 end | |
| 256 | |
| 257 function interface_mt:pause() | |
| 258 return self:_lock(self.nointerface, true, self.nowriting); | |
| 259 end | |
| 260 | |
| 261 function interface_mt:resume() | |
| 262 self:_lock(self.nointerface, false, self.nowriting); | |
| 263 if not self.eventread then | |
| 264 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback | |
| 265 end | |
| 266 end | |
| 267 | |
| 268 function interface_mt:counter(c) | |
| 269 if c then | |
| 270 self._connections = self._connections + c | |
| 271 end | |
| 272 return self._connections | |
| 273 end | |
| 274 | |
| 275 -- Public methods | |
| 276 function interface_mt:write(data) | |
| 277 if self.nowriting then return nil, "locked" end | |
| 278 --vdebug( "try to send data to client, id/data:", self.id, data ) | |
| 279 data = tostring( data ) | |
| 280 local len = #data | |
| 281 local total = len + self.writebufferlen | |
| 282 if total > cfg.MAX_SEND_LENGTH then -- check buffer length | |
| 283 local err = "send buffer exceeded" | |
| 284 debug( "error:", err ) -- to much, check your app | |
| 285 return nil, err | |
| 286 end | |
| 287 t_insert(self.writebuffer, data) -- new buffer | |
| 288 self.writebufferlen = total | |
| 289 if not self.eventwrite then -- register new write event | |
| 290 --vdebug( "register new write event" ) | |
| 291 self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ) | |
| 292 end | |
| 293 return true | |
| 294 end | |
| 295 function interface_mt:close() | |
| 296 if self.nointerface then return nil, "locked"; end | |
| 297 debug( "try to close client connection with id:", self.id ) | |
| 298 if self.type == "client" then | |
| 299 self.fatalerror = "client to close" | |
| 300 if self.eventwrite then -- wait for incomplete write request | |
| 301 self:_lock( true, true, false ) | |
| 302 debug "closing delayed until writebuffer is empty" | |
| 303 return nil, "writebuffer not empty, waiting" | |
| 304 else -- close now | |
| 305 self:_lock( true, true, true ) | |
| 306 self:_close() | |
| 307 return true | |
| 308 end | |
| 309 else | |
| 310 debug( "try to close server with id:", tostring(self.id)) | |
| 311 self.fatalerror = "server to close" | |
| 312 self:_lock( true ) | |
| 313 self:_close( 0 ) | |
| 314 return true | |
| 315 end | |
| 316 end | |
| 317 | |
| 318 function interface_mt:socket() | |
| 319 return self.conn | |
| 320 end | |
| 321 | |
| 322 function interface_mt:server() | |
| 323 return self._server or self; | |
| 324 end | |
| 325 | |
| 326 function interface_mt:port() | |
| 327 return self._port | |
| 328 end | |
| 329 | |
| 330 function interface_mt:serverport() | |
| 331 return self._serverport | |
| 332 end | |
| 333 | |
| 334 function interface_mt:ip() | |
| 335 return self._ip | |
| 336 end | |
| 337 | |
| 338 function interface_mt:ssl() | |
| 339 return self._usingssl | |
| 340 end | |
| 341 interface_mt.clientport = interface_mt.port -- COMPAT server_select | |
| 342 | |
| 343 function interface_mt:type() | |
| 344 return self._type or "client" | |
| 345 end | |
| 346 | |
| 347 function interface_mt:connections() | |
| 348 return self._connections | |
| 349 end | |
| 350 | |
| 351 function interface_mt:address() | |
| 352 return self.addr | |
| 353 end | |
| 354 | |
| 355 function interface_mt:set_sslctx(sslctx) | |
| 356 self._sslctx = sslctx; | |
| 357 if sslctx then | |
| 358 self.starttls = nil; -- use starttls() of interface_mt | |
| 359 else | |
| 360 self.starttls = false; -- prevent starttls() | |
| 361 end | |
| 362 end | |
| 363 | |
| 364 function interface_mt:set_mode(pattern) | |
| 365 if pattern then | |
| 366 self._pattern = pattern; | |
| 367 end | |
| 368 return self._pattern; | |
| 369 end | |
| 370 | |
| 371 function interface_mt:set_send(new_send) | |
| 372 -- No-op, we always use the underlying connection's send | |
| 373 end | |
| 374 | |
| 375 function interface_mt:starttls(sslctx, call_onconnect) | |
| 376 debug( "try to start ssl at client id:", self.id ) | |
| 377 local err | |
| 378 self._sslctx = sslctx; | |
| 379 if self._usingssl then -- startssl was already called | |
| 380 err = "ssl already active" | |
| 381 end | |
| 382 if err then | |
| 383 debug( "error:", err ) | |
| 384 return nil, err | |
| 385 end | |
| 386 self._usingssl = true | |
| 387 self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event | |
| 388 self.startsslcallback = nil | |
| 389 self:_start_ssl(call_onconnect); | |
| 390 self.eventstarthandshake = nil | |
| 391 return -1 | |
| 392 end | |
| 393 if not self.eventwrite then | |
| 394 self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake | |
| 395 self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake | |
| 396 else -- wait until writebuffer is empty | |
| 397 self:_lock( true, true, false ) | |
| 398 debug "ssl session delayed until writebuffer is empty..." | |
| 399 end | |
| 400 self.starttls = false; | |
| 401 return true | |
| 402 end | |
| 403 | |
| 404 function interface_mt:setoption(option, value) | |
| 405 if self.conn.setoption then | |
| 406 return self.conn:setoption(option, value); | |
| 407 end | |
| 408 return false, "setoption not implemented"; | |
| 409 end | |
| 410 | |
| 411 function interface_mt:setlistener(listener) | |
| 412 self:ondetach(); -- Notify listener that it is no longer responsible for this connection | |
| 413 self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, | |
| 414 self.onreadtimeout, self.onstatus, self.ondetach | |
| 415 = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout, | |
| 416 listener.onreadtimeout, listener.onstatus, listener.ondetach; | |
| 417 end | |
| 418 | |
| 419 -- Stub handlers | |
| 420 function interface_mt:onconnect() | |
| 421 end | |
| 422 function interface_mt:onincoming() | |
| 423 end | |
| 424 function interface_mt:ondisconnect() | |
| 425 end | |
| 426 function interface_mt:ontimeout() | |
| 427 end | |
| 428 function interface_mt:onreadtimeout() | |
| 429 self.fatalerror = "timeout during receiving" | |
| 430 debug( "connection failed:", self.fatalerror ) | |
| 431 self:_close() | |
| 432 self.eventread = nil | |
| 433 end | |
| 434 function interface_mt:ondrain() | |
| 435 end | |
| 436 function interface_mt:ondetach() | |
| 437 end | |
| 438 function interface_mt:onstatus() | |
| 439 end | |
| 440 end | |
| 441 | |
| 442 -- End of client interface methods | |
| 443 | |
| 444 local handleclient; | |
| 445 do | |
| 446 function handleclient( client, ip, port, server, pattern, listener, sslctx ) -- creates an client interface | |
| 447 --vdebug("creating client interfacce...") | |
| 448 local interface = { | |
| 449 type = "client"; | |
| 450 conn = client; | |
| 451 currenttime = socket_gettime( ); -- safe the origin | |
| 452 writebuffer = {}; -- writebuffer | |
| 453 writebufferlen = 0; -- length of writebuffer | |
| 454 send = client.send; -- caching table lookups | |
| 455 receive = client.receive; | |
| 456 onconnect = listener.onconnect; -- will be called when client disconnects | |
| 457 ondisconnect = listener.ondisconnect; -- will be called when client disconnects | |
| 458 onincoming = listener.onincoming; -- will be called when client sends data | |
| 459 ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs | |
| 460 onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs | |
| 461 ondrain = listener.ondrain; -- called when writebuffer is empty | |
| 462 ondetach = listener.ondetach; -- called when disassociating this listener from this connection | |
| 463 onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS) | |
| 464 eventread = false, eventwrite = false, eventclose = false, | |
| 465 eventhandshake = false, eventstarthandshake = false; -- event handler | |
| 466 eventconnect = false, eventsession = false; -- more event handler... | |
| 467 eventwritetimeout = false; -- even more event handler... | |
| 468 eventreadtimeout = false; | |
| 469 fatalerror = false; -- error message | |
| 470 writecallback = false; -- will be called on write events | |
| 471 readcallback = false; -- will be called on read events | |
| 472 nointerface = true; -- lock/unlock parameter of this interface | |
| 473 noreading = false, nowriting = false; -- locks of the read/writecallback | |
| 474 startsslcallback = false; -- starting handshake callback | |
| 475 position = false; -- position of client in interfacelist | |
| 476 | |
| 477 -- Properties | |
| 478 _ip = ip, _port = port, _server = server, _pattern = pattern, | |
| 479 _serverport = (server and server:port() or nil), | |
| 480 _sslctx = sslctx; -- parameters | |
| 481 _usingssl = false; -- client is using ssl; | |
| 482 } | |
| 483 if not has_luasec then interface.starttls = false; end | |
| 484 interface.id = tostring(interface):match("%x+$"); | |
| 485 interface.writecallback = function( event ) -- called on write events | |
| 486 --vdebug( "new client write event, id/ip/port:", interface, ip, port ) | |
| 487 if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event | |
| 488 --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror ) | |
| 489 interface.eventwrite = false | |
| 490 return -1 | |
| 491 end | |
| 492 if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect | |
| 493 interface.fatalerror = "timeout during writing" | |
| 494 debug( "writing failed:", interface.fatalerror ) | |
| 495 interface:_close() | |
| 496 interface.eventwrite = false | |
| 497 return -1 | |
| 498 else -- can write :) | |
| 499 if interface._usingssl then -- handle luasec | |
| 500 if interface.eventreadtimeout then -- we have to read first | |
| 501 local ret = interface.readcallback( ) -- call readcallback | |
| 502 --vdebug( "tried to read in writecallback, result:", ret ) | |
| 503 end | |
| 504 if interface.eventwritetimeout then -- luasec only | |
| 505 interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error | |
| 506 interface.eventwritetimeout = false | |
| 507 end | |
| 508 end | |
| 509 interface.writebuffer = { t_concat(interface.writebuffer) } | |
| 510 local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen ) | |
| 511 --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte ) | |
| 512 if succ then -- writing succesful | |
| 513 interface.writebuffer[1] = nil | |
| 514 interface.writebufferlen = 0 | |
| 515 interface:ondrain(); | |
| 516 if interface.fatalerror then | |
| 517 debug "closing client after writing" | |
| 518 interface:_close() -- close interface if needed | |
| 519 elseif interface.startsslcallback then -- start ssl connection if needed | |
| 520 debug "starting ssl handshake after writing" | |
| 521 interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 ) | |
| 522 elseif interface.eventreadtimeout then | |
| 523 return EV_WRITE, EV_TIMEOUT | |
| 524 end | |
| 525 interface.eventwrite = nil | |
| 526 return -1 | |
| 527 elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again | |
| 528 --vdebug( "writebuffer is not empty:", err ) | |
| 529 interface.writebuffer[1] = s_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer | |
| 530 interface.writebufferlen = interface.writebufferlen - byte | |
| 531 if "wantread" == err then -- happens only with luasec | |
| 532 local callback = function( ) | |
| 533 interface:_close() | |
| 534 interface.eventwritetimeout = nil | |
| 535 return -1; | |
| 536 end | 197 end |
| 537 interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event | 198 self:_close() |
| 538 debug( "wantread during write attempt, reg it in readcallback but dont know what really happens next..." ) | 199 debug( "handshake failed because:", self.fatalerror ) |
| 539 -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent | 200 self.eventhandshake = nil |
| 540 return -1 | 201 return -1 |
| 541 end | 202 end |
| 542 return EV_WRITE, cfg.WRITE_TIMEOUT | 203 event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster... |
| 543 else -- connection was closed during writing or fatal error | 204 end |
| 544 interface.fatalerror = err or "fatal error" | 205 end |
| 545 debug( "connection failed in write event:", interface.fatalerror ) | 206 ) |
| 546 interface:_close() | 207 debug "starting handshake..." |
| 547 interface.eventwrite = nil | 208 self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked |
| 209 self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT ) | |
| 210 return true | |
| 211 end | |
| 212 function interface_mt:_destroy() -- close this interface + events and call last listener | |
| 213 debug( "closing client with id:", self.id, self.fatalerror ) | |
| 214 self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions | |
| 215 local _ | |
| 216 _ = self.eventread and self.eventread:close( ) | |
| 217 if self.type == "client" then | |
| 218 _ = self.eventwrite and self.eventwrite:close( ) | |
| 219 _ = self.eventhandshake and self.eventhandshake:close( ) | |
| 220 _ = self.eventstarthandshake and self.eventstarthandshake:close( ) | |
| 221 _ = self.eventconnect and self.eventconnect:close( ) | |
| 222 _ = self.eventsession and self.eventsession:close( ) | |
| 223 _ = self.eventwritetimeout and self.eventwritetimeout:close( ) | |
| 224 _ = self.eventreadtimeout and self.eventreadtimeout:close( ) | |
| 225 _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect) | |
| 226 _ = self.conn and self.conn:close( ) -- close connection | |
| 227 _ = self._server and self._server:counter(-1); | |
| 228 self.eventread, self.eventwrite = nil, nil | |
| 229 self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil | |
| 230 self.readcallback, self.writecallback = nil, nil | |
| 231 else | |
| 232 self.conn:close( ) | |
| 233 self.eventread, self.eventclose = nil, nil | |
| 234 self.interface, self.readcallback = nil, nil | |
| 235 end | |
| 236 interfacelist[ self ] = nil | |
| 237 return true | |
| 238 end | |
| 239 | |
| 240 function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events | |
| 241 self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting | |
| 242 return nointerface, noreading, nowriting | |
| 243 end | |
| 244 | |
| 245 --TODO: Deprecate | |
| 246 function interface_mt:lock_read(switch) | |
| 247 if switch then | |
| 248 return self:pause(); | |
| 249 else | |
| 250 return self:resume(); | |
| 251 end | |
| 252 end | |
| 253 | |
| 254 function interface_mt:pause() | |
| 255 return self:_lock(self.nointerface, true, self.nowriting); | |
| 256 end | |
| 257 | |
| 258 function interface_mt:resume() | |
| 259 self:_lock(self.nointerface, false, self.nowriting); | |
| 260 if not self.eventread then | |
| 261 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback | |
| 262 end | |
| 263 end | |
| 264 | |
| 265 function interface_mt:counter(c) | |
| 266 if c then | |
| 267 self._connections = self._connections + c | |
| 268 end | |
| 269 return self._connections | |
| 270 end | |
| 271 | |
| 272 -- Public methods | |
| 273 function interface_mt:write(data) | |
| 274 if self.nowriting then return nil, "locked" end | |
| 275 --vdebug( "try to send data to client, id/data:", self.id, data ) | |
| 276 data = tostring( data ) | |
| 277 local len = #data | |
| 278 local total = len + self.writebufferlen | |
| 279 if total > cfg.MAX_SEND_LENGTH then -- check buffer length | |
| 280 local err = "send buffer exceeded" | |
| 281 debug( "error:", err ) -- to much, check your app | |
| 282 return nil, err | |
| 283 end | |
| 284 t_insert(self.writebuffer, data) -- new buffer | |
| 285 self.writebufferlen = total | |
| 286 if not self.eventwrite then -- register new write event | |
| 287 --vdebug( "register new write event" ) | |
| 288 self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ) | |
| 289 end | |
| 290 return true | |
| 291 end | |
| 292 function interface_mt:close() | |
| 293 if self.nointerface then return nil, "locked"; end | |
| 294 debug( "try to close client connection with id:", self.id ) | |
| 295 if self.type == "client" then | |
| 296 self.fatalerror = "client to close" | |
| 297 if self.eventwrite then -- wait for incomplete write request | |
| 298 self:_lock( true, true, false ) | |
| 299 debug "closing delayed until writebuffer is empty" | |
| 300 return nil, "writebuffer not empty, waiting" | |
| 301 else -- close now | |
| 302 self:_lock( true, true, true ) | |
| 303 self:_close() | |
| 304 return true | |
| 305 end | |
| 306 else | |
| 307 debug( "try to close server with id:", tostring(self.id)) | |
| 308 self.fatalerror = "server to close" | |
| 309 self:_lock( true ) | |
| 310 self:_close( 0 ) | |
| 311 return true | |
| 312 end | |
| 313 end | |
| 314 | |
| 315 function interface_mt:socket() | |
| 316 return self.conn | |
| 317 end | |
| 318 | |
| 319 function interface_mt:server() | |
| 320 return self._server or self; | |
| 321 end | |
| 322 | |
| 323 function interface_mt:port() | |
| 324 return self._port | |
| 325 end | |
| 326 | |
| 327 function interface_mt:serverport() | |
| 328 return self._serverport | |
| 329 end | |
| 330 | |
| 331 function interface_mt:ip() | |
| 332 return self._ip | |
| 333 end | |
| 334 | |
| 335 function interface_mt:ssl() | |
| 336 return self._usingssl | |
| 337 end | |
| 338 interface_mt.clientport = interface_mt.port -- COMPAT server_select | |
| 339 | |
| 340 function interface_mt:type() | |
| 341 return self._type or "client" | |
| 342 end | |
| 343 | |
| 344 function interface_mt:connections() | |
| 345 return self._connections | |
| 346 end | |
| 347 | |
| 348 function interface_mt:address() | |
| 349 return self.addr | |
| 350 end | |
| 351 | |
| 352 function interface_mt:set_sslctx(sslctx) | |
| 353 self._sslctx = sslctx; | |
| 354 if sslctx then | |
| 355 self.starttls = nil; -- use starttls() of interface_mt | |
| 356 else | |
| 357 self.starttls = false; -- prevent starttls() | |
| 358 end | |
| 359 end | |
| 360 | |
| 361 function interface_mt:set_mode(pattern) | |
| 362 if pattern then | |
| 363 self._pattern = pattern; | |
| 364 end | |
| 365 return self._pattern; | |
| 366 end | |
| 367 | |
| 368 function interface_mt:set_send(new_send) | |
| 369 -- No-op, we always use the underlying connection's send | |
| 370 end | |
| 371 | |
| 372 function interface_mt:starttls(sslctx, call_onconnect) | |
| 373 debug( "try to start ssl at client id:", self.id ) | |
| 374 local err | |
| 375 self._sslctx = sslctx; | |
| 376 if self._usingssl then -- startssl was already called | |
| 377 err = "ssl already active" | |
| 378 end | |
| 379 if err then | |
| 380 debug( "error:", err ) | |
| 381 return nil, err | |
| 382 end | |
| 383 self._usingssl = true | |
| 384 self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event | |
| 385 self.startsslcallback = nil | |
| 386 self:_start_ssl(call_onconnect); | |
| 387 self.eventstarthandshake = nil | |
| 388 return -1 | |
| 389 end | |
| 390 if not self.eventwrite then | |
| 391 self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake | |
| 392 self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake | |
| 393 else -- wait until writebuffer is empty | |
| 394 self:_lock( true, true, false ) | |
| 395 debug "ssl session delayed until writebuffer is empty..." | |
| 396 end | |
| 397 self.starttls = false; | |
| 398 return true | |
| 399 end | |
| 400 | |
| 401 function interface_mt:setoption(option, value) | |
| 402 if self.conn.setoption then | |
| 403 return self.conn:setoption(option, value); | |
| 404 end | |
| 405 return false, "setoption not implemented"; | |
| 406 end | |
| 407 | |
| 408 function interface_mt:setlistener(listener) | |
| 409 self:ondetach(); -- Notify listener that it is no longer responsible for this connection | |
| 410 self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, | |
| 411 self.onreadtimeout, self.onstatus, self.ondetach | |
| 412 = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout, | |
| 413 listener.onreadtimeout, listener.onstatus, listener.ondetach; | |
| 414 end | |
| 415 | |
| 416 -- Stub handlers | |
| 417 function interface_mt:onconnect() | |
| 418 end | |
| 419 function interface_mt:onincoming() | |
| 420 end | |
| 421 function interface_mt:ondisconnect() | |
| 422 end | |
| 423 function interface_mt:ontimeout() | |
| 424 end | |
| 425 function interface_mt:onreadtimeout() | |
| 426 self.fatalerror = "timeout during receiving" | |
| 427 debug( "connection failed:", self.fatalerror ) | |
| 428 self:_close() | |
| 429 self.eventread = nil | |
| 430 end | |
| 431 function interface_mt:ondrain() | |
| 432 end | |
| 433 function interface_mt:ondetach() | |
| 434 end | |
| 435 function interface_mt:onstatus() | |
| 436 end | |
| 437 | |
| 438 -- End of client interface methods | |
| 439 | |
| 440 local function handleclient( client, ip, port, server, pattern, listener, sslctx ) -- creates an client interface | |
| 441 --vdebug("creating client interfacce...") | |
| 442 local interface = { | |
| 443 type = "client"; | |
| 444 conn = client; | |
| 445 currenttime = socket_gettime( ); -- safe the origin | |
| 446 writebuffer = {}; -- writebuffer | |
| 447 writebufferlen = 0; -- length of writebuffer | |
| 448 send = client.send; -- caching table lookups | |
| 449 receive = client.receive; | |
| 450 onconnect = listener.onconnect; -- will be called when client disconnects | |
| 451 ondisconnect = listener.ondisconnect; -- will be called when client disconnects | |
| 452 onincoming = listener.onincoming; -- will be called when client sends data | |
| 453 ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs | |
| 454 onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs | |
| 455 ondrain = listener.ondrain; -- called when writebuffer is empty | |
| 456 ondetach = listener.ondetach; -- called when disassociating this listener from this connection | |
| 457 onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS) | |
| 458 eventread = false, eventwrite = false, eventclose = false, | |
| 459 eventhandshake = false, eventstarthandshake = false; -- event handler | |
| 460 eventconnect = false, eventsession = false; -- more event handler... | |
| 461 eventwritetimeout = false; -- even more event handler... | |
| 462 eventreadtimeout = false; | |
| 463 fatalerror = false; -- error message | |
| 464 writecallback = false; -- will be called on write events | |
| 465 readcallback = false; -- will be called on read events | |
| 466 nointerface = true; -- lock/unlock parameter of this interface | |
| 467 noreading = false, nowriting = false; -- locks of the read/writecallback | |
| 468 startsslcallback = false; -- starting handshake callback | |
| 469 position = false; -- position of client in interfacelist | |
| 470 | |
| 471 -- Properties | |
| 472 _ip = ip, _port = port, _server = server, _pattern = pattern, | |
| 473 _serverport = (server and server:port() or nil), | |
| 474 _sslctx = sslctx; -- parameters | |
| 475 _usingssl = false; -- client is using ssl; | |
| 476 } | |
| 477 if not has_luasec then interface.starttls = false; end | |
| 478 interface.id = tostring(interface):match("%x+$"); | |
| 479 interface.writecallback = function( event ) -- called on write events | |
| 480 --vdebug( "new client write event, id/ip/port:", interface, ip, port ) | |
| 481 if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event | |
| 482 --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror ) | |
| 483 interface.eventwrite = false | |
| 484 return -1 | |
| 485 end | |
| 486 if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect | |
| 487 interface.fatalerror = "timeout during writing" | |
| 488 debug( "writing failed:", interface.fatalerror ) | |
| 489 interface:_close() | |
| 490 interface.eventwrite = false | |
| 491 return -1 | |
| 492 else -- can write :) | |
| 493 if interface._usingssl then -- handle luasec | |
| 494 if interface.eventreadtimeout then -- we have to read first | |
| 495 local ret = interface.readcallback( ) -- call readcallback | |
| 496 --vdebug( "tried to read in writecallback, result:", ret ) | |
| 497 end | |
| 498 if interface.eventwritetimeout then -- luasec only | |
| 499 interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error | |
| 500 interface.eventwritetimeout = false | |
| 501 end | |
| 502 end | |
| 503 interface.writebuffer = { t_concat(interface.writebuffer) } | |
| 504 local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen ) | |
| 505 --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte ) | |
| 506 if succ then -- writing succesful | |
| 507 interface.writebuffer[1] = nil | |
| 508 interface.writebufferlen = 0 | |
| 509 interface:ondrain(); | |
| 510 if interface.fatalerror then | |
| 511 debug "closing client after writing" | |
| 512 interface:_close() -- close interface if needed | |
| 513 elseif interface.startsslcallback then -- start ssl connection if needed | |
| 514 debug "starting ssl handshake after writing" | |
| 515 interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 ) | |
| 516 elseif interface.eventreadtimeout then | |
| 517 return EV_WRITE, EV_TIMEOUT | |
| 518 end | |
| 519 interface.eventwrite = nil | |
| 520 return -1 | |
| 521 elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again | |
| 522 --vdebug( "writebuffer is not empty:", err ) | |
| 523 interface.writebuffer[1] = s_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer | |
| 524 interface.writebufferlen = interface.writebufferlen - byte | |
| 525 if "wantread" == err then -- happens only with luasec | |
| 526 local callback = function( ) | |
| 527 interface:_close() | |
| 528 interface.eventwritetimeout = nil | |
| 529 return -1; | |
| 530 end | |
| 531 interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event | |
| 532 debug( "wantread during write attempt, reg it in readcallback but dont know what really happens next..." ) | |
| 533 -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent | |
| 548 return -1 | 534 return -1 |
| 549 end | 535 end |
| 550 end | 536 return EV_WRITE, cfg.WRITE_TIMEOUT |
| 551 end | 537 else -- connection was closed during writing or fatal error |
| 552 | 538 interface.fatalerror = err or "fatal error" |
| 553 interface.readcallback = function( event ) -- called on read events | 539 debug( "connection failed in write event:", interface.fatalerror ) |
| 554 --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) | 540 interface:_close() |
| 555 if interface.noreading or interface.fatalerror then -- leave this event | 541 interface.eventwrite = nil |
| 556 --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) ) | |
| 557 interface.eventread = nil | |
| 558 return -1 | 542 return -1 |
| 559 end | 543 end |
| 560 if EV_TIMEOUT == event and interface:onreadtimeout() ~= true then | 544 end |
| 561 return -1 -- took too long to get some data from client -> disconnect | 545 end |
| 562 end | 546 |
| 563 if interface._usingssl then -- handle luasec | 547 interface.readcallback = function( event ) -- called on read events |
| 564 if interface.eventwritetimeout then -- ok, in the past writecallback was regged | 548 --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) |
| 565 local ret = interface.writecallback( ) -- call it | 549 if interface.noreading or interface.fatalerror then -- leave this event |
| 566 --vdebug( "tried to write in readcallback, result:", tostring(ret) ) | 550 --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) ) |
| 551 interface.eventread = nil | |
| 552 return -1 | |
| 553 end | |
| 554 if EV_TIMEOUT == event and interface:onreadtimeout() ~= true then | |
| 555 return -1 -- took too long to get some data from client -> disconnect | |
| 556 end | |
| 557 if interface._usingssl then -- handle luasec | |
| 558 if interface.eventwritetimeout then -- ok, in the past writecallback was regged | |
| 559 local ret = interface.writecallback( ) -- call it | |
| 560 --vdebug( "tried to write in readcallback, result:", tostring(ret) ) | |
| 561 end | |
| 562 if interface.eventreadtimeout then | |
| 563 interface.eventreadtimeout:close( ) | |
| 564 interface.eventreadtimeout = nil | |
| 565 end | |
| 566 end | |
| 567 local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" | |
| 568 --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) | |
| 569 buffer = buffer or part | |
| 570 if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length | |
| 571 interface.fatalerror = "receive buffer exceeded" | |
| 572 debug( "fatal error:", interface.fatalerror ) | |
| 573 interface:_close() | |
| 574 interface.eventread = nil | |
| 575 return -1 | |
| 576 end | |
| 577 if err and ( err ~= "timeout" and err ~= "wantread" ) then | |
| 578 if "wantwrite" == err then -- need to read on write event | |
| 579 if not interface.eventwrite then -- register new write event if needed | |
| 580 interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) | |
| 567 end | 581 end |
| 568 if interface.eventreadtimeout then | 582 interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, |
| 569 interface.eventreadtimeout:close( ) | 583 function( ) |
| 570 interface.eventreadtimeout = nil | 584 interface:_close() |
| 571 end | 585 end, cfg.READ_TIMEOUT |
| 572 end | 586 ) |
| 573 local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" | 587 debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." ) |
| 574 --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) | 588 -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... |
| 575 buffer = buffer or part | 589 else -- connection was closed or fatal error |
| 576 if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length | 590 interface.fatalerror = err |
| 577 interface.fatalerror = "receive buffer exceeded" | 591 debug( "connection failed in read event:", interface.fatalerror ) |
| 578 debug( "fatal error:", interface.fatalerror ) | |
| 579 interface:_close() | 592 interface:_close() |
| 580 interface.eventread = nil | 593 interface.eventread = nil |
| 581 return -1 | 594 return -1 |
| 582 end | 595 end |
| 583 if err and ( err ~= "timeout" and err ~= "wantread" ) then | 596 else |
| 584 if "wantwrite" == err then -- need to read on write event | 597 interface.onincoming( interface, buffer, err ) -- send new data to listener |
| 585 if not interface.eventwrite then -- register new write event if needed | 598 end |
| 586 interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) | 599 if interface.noreading then |
| 587 end | 600 interface.eventread = nil; |
| 588 interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, | 601 return -1; |
| 589 function( ) | 602 end |
| 590 interface:_close() | 603 return EV_READ, cfg.READ_TIMEOUT |
| 591 end, cfg.READ_TIMEOUT | 604 end |
| 592 ) | 605 |
| 593 debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." ) | 606 client:settimeout( 0 ) -- set non blocking |
| 594 -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... | 607 setmetatable(interface, interface_mt) |
| 595 else -- connection was closed or fatal error | 608 interfacelist[ interface ] = true -- add to interfacelist |
| 596 interface.fatalerror = err | 609 return interface |
| 597 debug( "connection failed in read event:", interface.fatalerror ) | 610 end |
| 598 interface:_close() | 611 |
| 599 interface.eventread = nil | 612 local function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface |
| 600 return -1 | 613 debug "creating server interface..." |
| 601 end | 614 local interface = { |
| 615 _connections = 0; | |
| 616 | |
| 617 conn = server; | |
| 618 onconnect = listener.onconnect; -- will be called when new client connected | |
| 619 eventread = false; -- read event handler | |
| 620 eventclose = false; -- close event handler | |
| 621 readcallback = false; -- read event callback | |
| 622 fatalerror = false; -- error message | |
| 623 nointerface = true; -- lock/unlock parameter | |
| 624 | |
| 625 _ip = addr, _port = port, _pattern = pattern, | |
| 626 _sslctx = sslctx; | |
| 627 } | |
| 628 interface.id = tostring(interface):match("%x+$"); | |
| 629 interface.readcallback = function( event ) -- server handler, called on incoming connections | |
| 630 --vdebug( "server can accept, id/addr/port:", interface, addr, port ) | |
| 631 if interface.fatalerror then | |
| 632 --vdebug( "leaving this event because:", self.fatalerror ) | |
| 633 interface.eventread = nil | |
| 634 return -1 | |
| 635 end | |
| 636 local delay = cfg.ACCEPT_DELAY | |
| 637 if EV_TIMEOUT == event then | |
| 638 if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count | |
| 639 debug( "to many connections, seconds to wait for next accept:", delay ) | |
| 640 return EV_TIMEOUT, delay -- timeout... | |
| 602 else | 641 else |
| 603 interface.onincoming( interface, buffer, err ) -- send new data to listener | 642 return EV_READ -- accept again |
| 604 end | 643 end |
| 605 if interface.noreading then | 644 end |
| 606 interface.eventread = nil; | 645 --vdebug("max connection check ok, accepting...") |
| 607 return -1; | 646 local client, err = server:accept() -- try to accept; TODO: check err |
| 608 end | 647 while client do |
| 609 return EV_READ, cfg.READ_TIMEOUT | 648 if interface._connections >= cfg.MAX_CONNECTIONS then |
| 610 end | 649 client:close( ) -- refuse connection |
| 611 | 650 debug( "maximal connections reached, refuse client connection; accept delay:", delay ) |
| 612 client:settimeout( 0 ) -- set non blocking | 651 return EV_TIMEOUT, delay -- delay for next accept attempt |
| 613 setmetatable(interface, interface_mt) | 652 end |
| 614 interfacelist[ interface ] = true -- add to interfacelist | 653 local client_ip, client_port = client:getpeername( ) |
| 615 return interface | 654 interface._connections = interface._connections + 1 -- increase connection count |
| 616 end | 655 local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx ) |
| 617 end | 656 --vdebug( "client id:", clientinterface, "startssl:", startssl ) |
| 618 | 657 if has_luasec and sslctx then |
| 619 local handleserver | 658 clientinterface:starttls(sslctx, true) |
| 620 do | |
| 621 function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface | |
| 622 debug "creating server interface..." | |
| 623 local interface = { | |
| 624 _connections = 0; | |
| 625 | |
| 626 conn = server; | |
| 627 onconnect = listener.onconnect; -- will be called when new client connected | |
| 628 eventread = false; -- read event handler | |
| 629 eventclose = false; -- close event handler | |
| 630 readcallback = false; -- read event callback | |
| 631 fatalerror = false; -- error message | |
| 632 nointerface = true; -- lock/unlock parameter | |
| 633 | |
| 634 _ip = addr, _port = port, _pattern = pattern, | |
| 635 _sslctx = sslctx; | |
| 636 } | |
| 637 interface.id = tostring(interface):match("%x+$"); | |
| 638 interface.readcallback = function( event ) -- server handler, called on incoming connections | |
| 639 --vdebug( "server can accept, id/addr/port:", interface, addr, port ) | |
| 640 if interface.fatalerror then | |
| 641 --vdebug( "leaving this event because:", self.fatalerror ) | |
| 642 interface.eventread = nil | |
| 643 return -1 | |
| 644 end | |
| 645 local delay = cfg.ACCEPT_DELAY | |
| 646 if EV_TIMEOUT == event then | |
| 647 if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count | |
| 648 debug( "to many connections, seconds to wait for next accept:", delay ) | |
| 649 return EV_TIMEOUT, delay -- timeout... | |
| 650 else | |
| 651 return EV_READ -- accept again | |
| 652 end | |
| 653 end | |
| 654 --vdebug("max connection check ok, accepting...") | |
| 655 local client, err = server:accept() -- try to accept; TODO: check err | |
| 656 while client do | |
| 657 if interface._connections >= cfg.MAX_CONNECTIONS then | |
| 658 client:close( ) -- refuse connection | |
| 659 debug( "maximal connections reached, refuse client connection; accept delay:", delay ) | |
| 660 return EV_TIMEOUT, delay -- delay for next accept attempt | |
| 661 end | |
| 662 local client_ip, client_port = client:getpeername( ) | |
| 663 interface._connections = interface._connections + 1 -- increase connection count | |
| 664 local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx ) | |
| 665 --vdebug( "client id:", clientinterface, "startssl:", startssl ) | |
| 666 if has_luasec and sslctx then | |
| 667 clientinterface:starttls(sslctx, true) | |
| 668 else | |
| 669 clientinterface:_start_session( true ) | |
| 670 end | |
| 671 debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>"); | |
| 672 | |
| 673 client, err = server:accept() -- try to accept again | |
| 674 end | |
| 675 return EV_READ | |
| 676 end | |
| 677 | |
| 678 server:settimeout( 0 ) | |
| 679 setmetatable(interface, interface_mt) | |
| 680 interfacelist[ interface ] = true | |
| 681 interface:_start_session() | |
| 682 return interface | |
| 683 end | |
| 684 end | |
| 685 | |
| 686 local addserver = ( function( ) | |
| 687 return function( addr, port, listener, pattern, sslctx, startssl ) -- TODO: check arguments | |
| 688 --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslctx or "nil", startssl or "nil") | |
| 689 if sslctx and not has_luasec then | |
| 690 debug "fatal error: luasec not found" | |
| 691 return nil, "luasec not found" | |
| 692 end | |
| 693 local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket | |
| 694 if not server then | |
| 695 debug( "creating server socket on "..addr.." port "..port.." failed:", err ) | |
| 696 return nil, err | |
| 697 end | |
| 698 local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler | |
| 699 debug( "new server created with id:", tostring(interface)) | |
| 700 return interface | |
| 701 end | |
| 702 end )( ) | |
| 703 | |
| 704 local addclient, wrapclient | |
| 705 do | |
| 706 function wrapclient( client, ip, port, listeners, pattern, sslctx ) | |
| 707 local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) | |
| 708 interface:_start_connection(sslctx) | |
| 709 return interface, client | |
| 710 --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface | |
| 711 end | |
| 712 | |
| 713 function addclient( addr, serverport, listener, pattern, sslctx, typ ) | |
| 714 if sslctx and not has_luasec then | |
| 715 debug "need luasec, but not available" | |
| 716 return nil, "luasec not found" | |
| 717 end | |
| 718 if not typ then | |
| 719 local addrinfo, err = getaddrinfo(addr) | |
| 720 if not addrinfo then return nil, err end | |
| 721 if addrinfo[1] and addrinfo[1].family == "inet6" then | |
| 722 typ = "tcp6" | |
| 723 else | 659 else |
| 724 typ = "tcp" | 660 clientinterface:_start_session( true ) |
| 725 end | 661 end |
| 726 end | 662 debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>"); |
| 727 local create = socket[typ] | 663 |
| 728 if type( create ) ~= "function" then | 664 client, err = server:accept() -- try to accept again |
| 729 return nil, "invalid socket type" | 665 end |
| 730 end | 666 return EV_READ |
| 731 local client, err = create() -- creating new socket | 667 end |
| 732 if not client then | 668 |
| 733 debug( "cannot create socket:", err ) | 669 server:settimeout( 0 ) |
| 734 return nil, err | 670 setmetatable(interface, interface_mt) |
| 735 end | 671 interfacelist[ interface ] = true |
| 736 client:settimeout( 0 ) -- set nonblocking | 672 interface:_start_session() |
| 737 local res, err = client:connect( addr, serverport ) -- connect | 673 return interface |
| 738 if res or ( err == "timeout" ) then | 674 end |
| 739 local ip, port = client:getsockname( ) | 675 |
| 740 local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx ) | 676 local function addserver( addr, port, listener, pattern, sslctx, startssl ) -- TODO: check arguments |
| 741 interface:_start_connection( startssl ) | 677 --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslctx or "nil", startssl or "nil") |
| 742 debug( "new connection id:", interface.id ) | 678 if sslctx and not has_luasec then |
| 743 return interface, err | 679 debug "fatal error: luasec not found" |
| 680 return nil, "luasec not found" | |
| 681 end | |
| 682 local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket | |
| 683 if not server then | |
| 684 debug( "creating server socket on "..addr.." port "..port.." failed:", err ) | |
| 685 return nil, err | |
| 686 end | |
| 687 local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler | |
| 688 debug( "new server created with id:", tostring(interface)) | |
| 689 return interface | |
| 690 end | |
| 691 | |
| 692 local function wrapclient( client, ip, port, listeners, pattern, sslctx ) | |
| 693 local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) | |
| 694 interface:_start_connection(sslctx) | |
| 695 return interface, client | |
| 696 --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface | |
| 697 end | |
| 698 | |
| 699 local function addclient( addr, serverport, listener, pattern, sslctx, typ ) | |
| 700 if sslctx and not has_luasec then | |
| 701 debug "need luasec, but not available" | |
| 702 return nil, "luasec not found" | |
| 703 end | |
| 704 if not typ then | |
| 705 local addrinfo, err = getaddrinfo(addr) | |
| 706 if not addrinfo then return nil, err end | |
| 707 if addrinfo[1] and addrinfo[1].family == "inet6" then | |
| 708 typ = "tcp6" | |
| 744 else | 709 else |
| 745 debug( "new connection failed:", err ) | 710 typ = "tcp" |
| 746 return nil, err | 711 end |
| 747 end | 712 end |
| 748 end | 713 local create = socket[typ] |
| 749 end | 714 if type( create ) ~= "function" then |
| 750 | 715 return nil, "invalid socket type" |
| 751 | 716 end |
| 752 local loop = function( ) -- starts the event loop | 717 local client, err = create() -- creating new socket |
| 718 if not client then | |
| 719 debug( "cannot create socket:", err ) | |
| 720 return nil, err | |
| 721 end | |
| 722 client:settimeout( 0 ) -- set nonblocking | |
| 723 local res, err = client:connect( addr, serverport ) -- connect | |
| 724 if res or ( err == "timeout" ) then | |
| 725 local ip, port = client:getsockname( ) | |
| 726 local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx ) | |
| 727 interface:_start_connection( startssl ) | |
| 728 debug( "new connection id:", interface.id ) | |
| 729 return interface, err | |
| 730 else | |
| 731 debug( "new connection failed:", err ) | |
| 732 return nil, err | |
| 733 end | |
| 734 end | |
| 735 | |
| 736 local function loop( ) -- starts the event loop | |
| 753 base:loop( ) | 737 base:loop( ) |
| 754 return "quitting"; | 738 return "quitting"; |
| 755 end | 739 end |
| 756 | 740 |
| 757 local function newevent( ... ) | 741 local function newevent( ... ) |
| 758 return addevent( base, ... ) | 742 return addevent( base, ... ) |
| 759 end | 743 end |
| 760 | 744 |
| 761 local closeallservers = function( arg ) | 745 local function closeallservers ( arg ) |
| 762 for item in pairs( interfacelist ) do | 746 for item in pairs( interfacelist ) do |
| 763 if item.type == "server" then | 747 if item.type == "server" then |
| 764 item:close( arg ) | 748 item:close( arg ) |
| 765 end | 749 end |
| 766 end | 750 end |
| 812 end | 796 end |
| 813 sender:set_mode("*a"); | 797 sender:set_mode("*a"); |
| 814 end | 798 end |
| 815 | 799 |
| 816 return { | 800 return { |
| 817 | |
| 818 cfg = cfg, | 801 cfg = cfg, |
| 819 base = base, | 802 base = base, |
| 820 loop = loop, | 803 loop = loop, |
| 821 link = link, | 804 link = link, |
| 822 event = event, | 805 event = event, |