Software /
code /
prosody
Comparison
net/server_event.lua @ 6851:1f1bed8ebc41
server_event: Remove needless scoping and indentation
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Fri, 25 Sep 2015 17:12:55 +0200 |
parent | 6850:41de00647ad3 |
child | 6852:626d8152b1ad |
comparison
equal
deleted
inserted
replaced
6850:41de00647ad3 | 6851:1f1bed8ebc41 |
---|---|
87 local EV_READWRITE = bitor( EV_READ, EV_WRITE ) | 87 local EV_READWRITE = bitor( EV_READ, EV_WRITE ) |
88 | 88 |
89 local interfacelist = { } | 89 local interfacelist = { } |
90 | 90 |
91 -- Client interface methods | 91 -- Client interface methods |
92 local interface_mt | 92 local interface_mt = {}; interface_mt.__index = interface_mt; |
93 do | 93 |
94 interface_mt = {}; interface_mt.__index = interface_mt; | 94 -- Private methods |
95 | 95 function interface_mt:_close() |
96 | 96 return self:_destroy(); |
97 -- Private methods | 97 end |
98 function interface_mt:_close() | 98 |
99 return self:_destroy(); | 99 function interface_mt:_start_connection(plainssl) -- should be called from addclient |
100 end | 100 local callback = function( event ) |
101 | 101 if EV_TIMEOUT == event then -- timeout during connection |
102 function interface_mt:_start_connection(plainssl) -- should be called from addclient | 102 self.fatalerror = "connection timeout" |
103 local callback = function( event ) | 103 self:ontimeout() -- call timeout listener |
104 if EV_TIMEOUT == event then -- timeout during connection | 104 self:_close() |
105 self.fatalerror = "connection timeout" | 105 debug( "new connection failed. id:", self.id, "error:", self.fatalerror ) |
106 self:ontimeout() -- call timeout listener | 106 else |
107 self:_close() | 107 if plainssl and has_luasec then -- start ssl session |
108 debug( "new connection failed. id:", self.id, "error:", self.fatalerror ) | 108 self:starttls(self._sslctx, true) |
109 else | 109 else -- normal connection |
110 if plainssl and has_luasec then -- start ssl session | 110 self:_start_session(true) |
111 self:starttls(self._sslctx, true) | |
112 else -- normal connection | |
113 self:_start_session(true) | |
114 end | |
115 debug( "new connection established. id:", self.id ) | |
116 end | 111 end |
117 self.eventconnect = nil | 112 debug( "new connection established. id:", self.id ) |
118 return -1 | 113 end |
119 end | 114 self.eventconnect = nil |
120 self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT ) | 115 return -1 |
121 return true | 116 end |
122 end | 117 self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT ) |
123 function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl | |
124 if self.type == "client" then | |
125 local callback = function( ) | |
126 self:_lock( false, false, false ) | |
127 --vdebug( "start listening on client socket with id:", self.id ) | |
128 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback | |
129 if call_onconnect then | |
130 self:onconnect() | |
131 end | |
132 self.eventsession = nil | |
133 return -1 | |
134 end | |
135 self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 ) | |
136 else | |
137 self:_lock( false ) | |
138 --vdebug( "start listening on server socket with id:", self.id ) | |
139 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback | |
140 end | |
141 return true | 118 return true |
142 end | 119 end |
143 function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first | 120 function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl |
144 --vdebug( "starting ssl session with client id:", self.id ) | 121 if self.type == "client" then |
145 local _ | 122 local callback = function( ) |
146 _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! | 123 self:_lock( false, false, false ) |
147 _ = self.eventwrite and self.eventwrite:close( ) | 124 --vdebug( "start listening on client socket with id:", self.id ) |
148 self.eventread, self.eventwrite = nil, nil | 125 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback |
149 local err | 126 if call_onconnect then |
150 self.conn, err = ssl.wrap( self.conn, self._sslctx ) | 127 self:onconnect() |
151 if err then | 128 end |
152 self.fatalerror = err | 129 self.eventsession = nil |
153 self.conn = nil -- cannot be used anymore | 130 return -1 |
154 if call_onconnect then | 131 end |
155 self.ondisconnect = nil -- dont call this when client isnt really connected | 132 self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 ) |
156 end | 133 else |
157 self:_close() | 134 self:_lock( false ) |
158 debug( "fatal error while ssl wrapping:", err ) | 135 --vdebug( "start listening on server socket with id:", self.id ) |
159 return false | 136 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback |
160 end | 137 end |
161 self.conn:settimeout( 0 ) -- set non blocking | 138 return true |
162 local handshakecallback = coroutine_wrap( | 139 end |
163 function( event ) | 140 function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first |
164 local _, err | 141 --vdebug( "starting ssl session with client id:", self.id ) |
165 local attempt = 0 | 142 local _ |
166 local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS | 143 _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! |
167 while attempt < maxattempt do -- no endless loop | 144 _ = self.eventwrite and self.eventwrite:close( ) |
168 attempt = attempt + 1 | 145 self.eventread, self.eventwrite = nil, nil |
169 debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt ) | 146 local err |
170 if attempt > maxattempt then | 147 self.conn, err = ssl.wrap( self.conn, self._sslctx ) |
171 self.fatalerror = "max handshake attempts exceeded" | 148 if err then |
172 elseif EV_TIMEOUT == event then | 149 self.fatalerror = err |
173 self.fatalerror = "timeout during handshake" | 150 self.conn = nil -- cannot be used anymore |
174 else | 151 if call_onconnect then |
175 _, err = self.conn:dohandshake( ) | 152 self.ondisconnect = nil -- dont call this when client isnt really connected |
176 if not err then | 153 end |
177 self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed | 154 self:_close() |
178 self.send = self.conn.send -- caching table lookups with new client object | 155 debug( "fatal error while ssl wrapping:", err ) |
179 self.receive = self.conn.receive | 156 return false |
180 if not call_onconnect then -- trigger listener | 157 end |
181 self:onstatus("ssl-handshake-complete"); | 158 self.conn:settimeout( 0 ) -- set non blocking |
182 end | 159 local handshakecallback = coroutine_wrap( |
183 self:_start_session( call_onconnect ) | 160 function( event ) |
184 debug( "ssl handshake done" ) | 161 local _, err |
185 self.eventhandshake = nil | 162 local attempt = 0 |
186 return -1 | 163 local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS |
164 while attempt < maxattempt do -- no endless loop | |
165 attempt = attempt + 1 | |
166 debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt ) | |
167 if attempt > maxattempt then | |
168 self.fatalerror = "max handshake attempts exceeded" | |
169 elseif EV_TIMEOUT == event then | |
170 self.fatalerror = "timeout during handshake" | |
171 else | |
172 _, err = self.conn:dohandshake( ) | |
173 if not err then | |
174 self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed | |
175 self.send = self.conn.send -- caching table lookups with new client object | |
176 self.receive = self.conn.receive | |
177 if not call_onconnect then -- trigger listener | |
178 self:onstatus("ssl-handshake-complete"); | |
187 end | 179 end |
188 if err == "wantwrite" then | 180 self:_start_session( call_onconnect ) |
189 event = EV_WRITE | 181 debug( "ssl handshake done" ) |
190 elseif err == "wantread" then | |
191 event = EV_READ | |
192 else | |
193 debug( "ssl handshake error:", err ) | |
194 self.fatalerror = err | |
195 end | |
196 end | |
197 if self.fatalerror then | |
198 if call_onconnect then | |
199 self.ondisconnect = nil -- dont call this when client isnt really connected | |
200 end | |
201 self:_close() | |
202 debug( "handshake failed because:", self.fatalerror ) | |
203 self.eventhandshake = nil | 182 self.eventhandshake = nil |
204 return -1 | 183 return -1 |
205 end | 184 end |
206 event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster... | 185 if err == "wantwrite" then |
186 event = EV_WRITE | |
187 elseif err == "wantread" then | |
188 event = EV_READ | |
189 else | |
190 debug( "ssl handshake error:", err ) | |
191 self.fatalerror = err | |
192 end | |
207 end | 193 end |
208 end | 194 if self.fatalerror then |
209 ) | 195 if call_onconnect then |
210 debug "starting handshake..." | 196 self.ondisconnect = nil -- dont call this when client isnt really connected |
211 self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked | |
212 self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT ) | |
213 return true | |
214 end | |
215 function interface_mt:_destroy() -- close this interface + events and call last listener | |
216 debug( "closing client with id:", self.id, self.fatalerror ) | |
217 self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions | |
218 local _ | |
219 _ = self.eventread and self.eventread:close( ) | |
220 if self.type == "client" then | |
221 _ = self.eventwrite and self.eventwrite:close( ) | |
222 _ = self.eventhandshake and self.eventhandshake:close( ) | |
223 _ = self.eventstarthandshake and self.eventstarthandshake:close( ) | |
224 _ = self.eventconnect and self.eventconnect:close( ) | |
225 _ = self.eventsession and self.eventsession:close( ) | |
226 _ = self.eventwritetimeout and self.eventwritetimeout:close( ) | |
227 _ = self.eventreadtimeout and self.eventreadtimeout:close( ) | |
228 _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect) | |
229 _ = self.conn and self.conn:close( ) -- close connection | |
230 _ = self._server and self._server:counter(-1); | |
231 self.eventread, self.eventwrite = nil, nil | |
232 self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil | |
233 self.readcallback, self.writecallback = nil, nil | |
234 else | |
235 self.conn:close( ) | |
236 self.eventread, self.eventclose = nil, nil | |
237 self.interface, self.readcallback = nil, nil | |
238 end | |
239 interfacelist[ self ] = nil | |
240 return true | |
241 end | |
242 | |
243 function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events | |
244 self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting | |
245 return nointerface, noreading, nowriting | |
246 end | |
247 | |
248 --TODO: Deprecate | |
249 function interface_mt:lock_read(switch) | |
250 if switch then | |
251 return self:pause(); | |
252 else | |
253 return self:resume(); | |
254 end | |
255 end | |
256 | |
257 function interface_mt:pause() | |
258 return self:_lock(self.nointerface, true, self.nowriting); | |
259 end | |
260 | |
261 function interface_mt:resume() | |
262 self:_lock(self.nointerface, false, self.nowriting); | |
263 if not self.eventread then | |
264 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback | |
265 end | |
266 end | |
267 | |
268 function interface_mt:counter(c) | |
269 if c then | |
270 self._connections = self._connections + c | |
271 end | |
272 return self._connections | |
273 end | |
274 | |
275 -- Public methods | |
276 function interface_mt:write(data) | |
277 if self.nowriting then return nil, "locked" end | |
278 --vdebug( "try to send data to client, id/data:", self.id, data ) | |
279 data = tostring( data ) | |
280 local len = #data | |
281 local total = len + self.writebufferlen | |
282 if total > cfg.MAX_SEND_LENGTH then -- check buffer length | |
283 local err = "send buffer exceeded" | |
284 debug( "error:", err ) -- to much, check your app | |
285 return nil, err | |
286 end | |
287 t_insert(self.writebuffer, data) -- new buffer | |
288 self.writebufferlen = total | |
289 if not self.eventwrite then -- register new write event | |
290 --vdebug( "register new write event" ) | |
291 self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ) | |
292 end | |
293 return true | |
294 end | |
295 function interface_mt:close() | |
296 if self.nointerface then return nil, "locked"; end | |
297 debug( "try to close client connection with id:", self.id ) | |
298 if self.type == "client" then | |
299 self.fatalerror = "client to close" | |
300 if self.eventwrite then -- wait for incomplete write request | |
301 self:_lock( true, true, false ) | |
302 debug "closing delayed until writebuffer is empty" | |
303 return nil, "writebuffer not empty, waiting" | |
304 else -- close now | |
305 self:_lock( true, true, true ) | |
306 self:_close() | |
307 return true | |
308 end | |
309 else | |
310 debug( "try to close server with id:", tostring(self.id)) | |
311 self.fatalerror = "server to close" | |
312 self:_lock( true ) | |
313 self:_close( 0 ) | |
314 return true | |
315 end | |
316 end | |
317 | |
318 function interface_mt:socket() | |
319 return self.conn | |
320 end | |
321 | |
322 function interface_mt:server() | |
323 return self._server or self; | |
324 end | |
325 | |
326 function interface_mt:port() | |
327 return self._port | |
328 end | |
329 | |
330 function interface_mt:serverport() | |
331 return self._serverport | |
332 end | |
333 | |
334 function interface_mt:ip() | |
335 return self._ip | |
336 end | |
337 | |
338 function interface_mt:ssl() | |
339 return self._usingssl | |
340 end | |
341 interface_mt.clientport = interface_mt.port -- COMPAT server_select | |
342 | |
343 function interface_mt:type() | |
344 return self._type or "client" | |
345 end | |
346 | |
347 function interface_mt:connections() | |
348 return self._connections | |
349 end | |
350 | |
351 function interface_mt:address() | |
352 return self.addr | |
353 end | |
354 | |
355 function interface_mt:set_sslctx(sslctx) | |
356 self._sslctx = sslctx; | |
357 if sslctx then | |
358 self.starttls = nil; -- use starttls() of interface_mt | |
359 else | |
360 self.starttls = false; -- prevent starttls() | |
361 end | |
362 end | |
363 | |
364 function interface_mt:set_mode(pattern) | |
365 if pattern then | |
366 self._pattern = pattern; | |
367 end | |
368 return self._pattern; | |
369 end | |
370 | |
371 function interface_mt:set_send(new_send) | |
372 -- No-op, we always use the underlying connection's send | |
373 end | |
374 | |
375 function interface_mt:starttls(sslctx, call_onconnect) | |
376 debug( "try to start ssl at client id:", self.id ) | |
377 local err | |
378 self._sslctx = sslctx; | |
379 if self._usingssl then -- startssl was already called | |
380 err = "ssl already active" | |
381 end | |
382 if err then | |
383 debug( "error:", err ) | |
384 return nil, err | |
385 end | |
386 self._usingssl = true | |
387 self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event | |
388 self.startsslcallback = nil | |
389 self:_start_ssl(call_onconnect); | |
390 self.eventstarthandshake = nil | |
391 return -1 | |
392 end | |
393 if not self.eventwrite then | |
394 self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake | |
395 self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake | |
396 else -- wait until writebuffer is empty | |
397 self:_lock( true, true, false ) | |
398 debug "ssl session delayed until writebuffer is empty..." | |
399 end | |
400 self.starttls = false; | |
401 return true | |
402 end | |
403 | |
404 function interface_mt:setoption(option, value) | |
405 if self.conn.setoption then | |
406 return self.conn:setoption(option, value); | |
407 end | |
408 return false, "setoption not implemented"; | |
409 end | |
410 | |
411 function interface_mt:setlistener(listener) | |
412 self:ondetach(); -- Notify listener that it is no longer responsible for this connection | |
413 self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, | |
414 self.onreadtimeout, self.onstatus, self.ondetach | |
415 = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout, | |
416 listener.onreadtimeout, listener.onstatus, listener.ondetach; | |
417 end | |
418 | |
419 -- Stub handlers | |
420 function interface_mt:onconnect() | |
421 end | |
422 function interface_mt:onincoming() | |
423 end | |
424 function interface_mt:ondisconnect() | |
425 end | |
426 function interface_mt:ontimeout() | |
427 end | |
428 function interface_mt:onreadtimeout() | |
429 self.fatalerror = "timeout during receiving" | |
430 debug( "connection failed:", self.fatalerror ) | |
431 self:_close() | |
432 self.eventread = nil | |
433 end | |
434 function interface_mt:ondrain() | |
435 end | |
436 function interface_mt:ondetach() | |
437 end | |
438 function interface_mt:onstatus() | |
439 end | |
440 end | |
441 | |
442 -- End of client interface methods | |
443 | |
444 local handleclient; | |
445 do | |
446 function handleclient( client, ip, port, server, pattern, listener, sslctx ) -- creates an client interface | |
447 --vdebug("creating client interfacce...") | |
448 local interface = { | |
449 type = "client"; | |
450 conn = client; | |
451 currenttime = socket_gettime( ); -- safe the origin | |
452 writebuffer = {}; -- writebuffer | |
453 writebufferlen = 0; -- length of writebuffer | |
454 send = client.send; -- caching table lookups | |
455 receive = client.receive; | |
456 onconnect = listener.onconnect; -- will be called when client disconnects | |
457 ondisconnect = listener.ondisconnect; -- will be called when client disconnects | |
458 onincoming = listener.onincoming; -- will be called when client sends data | |
459 ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs | |
460 onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs | |
461 ondrain = listener.ondrain; -- called when writebuffer is empty | |
462 ondetach = listener.ondetach; -- called when disassociating this listener from this connection | |
463 onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS) | |
464 eventread = false, eventwrite = false, eventclose = false, | |
465 eventhandshake = false, eventstarthandshake = false; -- event handler | |
466 eventconnect = false, eventsession = false; -- more event handler... | |
467 eventwritetimeout = false; -- even more event handler... | |
468 eventreadtimeout = false; | |
469 fatalerror = false; -- error message | |
470 writecallback = false; -- will be called on write events | |
471 readcallback = false; -- will be called on read events | |
472 nointerface = true; -- lock/unlock parameter of this interface | |
473 noreading = false, nowriting = false; -- locks of the read/writecallback | |
474 startsslcallback = false; -- starting handshake callback | |
475 position = false; -- position of client in interfacelist | |
476 | |
477 -- Properties | |
478 _ip = ip, _port = port, _server = server, _pattern = pattern, | |
479 _serverport = (server and server:port() or nil), | |
480 _sslctx = sslctx; -- parameters | |
481 _usingssl = false; -- client is using ssl; | |
482 } | |
483 if not has_luasec then interface.starttls = false; end | |
484 interface.id = tostring(interface):match("%x+$"); | |
485 interface.writecallback = function( event ) -- called on write events | |
486 --vdebug( "new client write event, id/ip/port:", interface, ip, port ) | |
487 if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event | |
488 --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror ) | |
489 interface.eventwrite = false | |
490 return -1 | |
491 end | |
492 if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect | |
493 interface.fatalerror = "timeout during writing" | |
494 debug( "writing failed:", interface.fatalerror ) | |
495 interface:_close() | |
496 interface.eventwrite = false | |
497 return -1 | |
498 else -- can write :) | |
499 if interface._usingssl then -- handle luasec | |
500 if interface.eventreadtimeout then -- we have to read first | |
501 local ret = interface.readcallback( ) -- call readcallback | |
502 --vdebug( "tried to read in writecallback, result:", ret ) | |
503 end | |
504 if interface.eventwritetimeout then -- luasec only | |
505 interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error | |
506 interface.eventwritetimeout = false | |
507 end | |
508 end | |
509 interface.writebuffer = { t_concat(interface.writebuffer) } | |
510 local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen ) | |
511 --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte ) | |
512 if succ then -- writing succesful | |
513 interface.writebuffer[1] = nil | |
514 interface.writebufferlen = 0 | |
515 interface:ondrain(); | |
516 if interface.fatalerror then | |
517 debug "closing client after writing" | |
518 interface:_close() -- close interface if needed | |
519 elseif interface.startsslcallback then -- start ssl connection if needed | |
520 debug "starting ssl handshake after writing" | |
521 interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 ) | |
522 elseif interface.eventreadtimeout then | |
523 return EV_WRITE, EV_TIMEOUT | |
524 end | |
525 interface.eventwrite = nil | |
526 return -1 | |
527 elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again | |
528 --vdebug( "writebuffer is not empty:", err ) | |
529 interface.writebuffer[1] = s_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer | |
530 interface.writebufferlen = interface.writebufferlen - byte | |
531 if "wantread" == err then -- happens only with luasec | |
532 local callback = function( ) | |
533 interface:_close() | |
534 interface.eventwritetimeout = nil | |
535 return -1; | |
536 end | 197 end |
537 interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event | 198 self:_close() |
538 debug( "wantread during write attempt, reg it in readcallback but dont know what really happens next..." ) | 199 debug( "handshake failed because:", self.fatalerror ) |
539 -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent | 200 self.eventhandshake = nil |
540 return -1 | 201 return -1 |
541 end | 202 end |
542 return EV_WRITE, cfg.WRITE_TIMEOUT | 203 event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster... |
543 else -- connection was closed during writing or fatal error | 204 end |
544 interface.fatalerror = err or "fatal error" | 205 end |
545 debug( "connection failed in write event:", interface.fatalerror ) | 206 ) |
546 interface:_close() | 207 debug "starting handshake..." |
547 interface.eventwrite = nil | 208 self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked |
209 self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT ) | |
210 return true | |
211 end | |
212 function interface_mt:_destroy() -- close this interface + events and call last listener | |
213 debug( "closing client with id:", self.id, self.fatalerror ) | |
214 self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions | |
215 local _ | |
216 _ = self.eventread and self.eventread:close( ) | |
217 if self.type == "client" then | |
218 _ = self.eventwrite and self.eventwrite:close( ) | |
219 _ = self.eventhandshake and self.eventhandshake:close( ) | |
220 _ = self.eventstarthandshake and self.eventstarthandshake:close( ) | |
221 _ = self.eventconnect and self.eventconnect:close( ) | |
222 _ = self.eventsession and self.eventsession:close( ) | |
223 _ = self.eventwritetimeout and self.eventwritetimeout:close( ) | |
224 _ = self.eventreadtimeout and self.eventreadtimeout:close( ) | |
225 _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect) | |
226 _ = self.conn and self.conn:close( ) -- close connection | |
227 _ = self._server and self._server:counter(-1); | |
228 self.eventread, self.eventwrite = nil, nil | |
229 self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil | |
230 self.readcallback, self.writecallback = nil, nil | |
231 else | |
232 self.conn:close( ) | |
233 self.eventread, self.eventclose = nil, nil | |
234 self.interface, self.readcallback = nil, nil | |
235 end | |
236 interfacelist[ self ] = nil | |
237 return true | |
238 end | |
239 | |
240 function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events | |
241 self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting | |
242 return nointerface, noreading, nowriting | |
243 end | |
244 | |
245 --TODO: Deprecate | |
246 function interface_mt:lock_read(switch) | |
247 if switch then | |
248 return self:pause(); | |
249 else | |
250 return self:resume(); | |
251 end | |
252 end | |
253 | |
254 function interface_mt:pause() | |
255 return self:_lock(self.nointerface, true, self.nowriting); | |
256 end | |
257 | |
258 function interface_mt:resume() | |
259 self:_lock(self.nointerface, false, self.nowriting); | |
260 if not self.eventread then | |
261 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback | |
262 end | |
263 end | |
264 | |
265 function interface_mt:counter(c) | |
266 if c then | |
267 self._connections = self._connections + c | |
268 end | |
269 return self._connections | |
270 end | |
271 | |
272 -- Public methods | |
273 function interface_mt:write(data) | |
274 if self.nowriting then return nil, "locked" end | |
275 --vdebug( "try to send data to client, id/data:", self.id, data ) | |
276 data = tostring( data ) | |
277 local len = #data | |
278 local total = len + self.writebufferlen | |
279 if total > cfg.MAX_SEND_LENGTH then -- check buffer length | |
280 local err = "send buffer exceeded" | |
281 debug( "error:", err ) -- to much, check your app | |
282 return nil, err | |
283 end | |
284 t_insert(self.writebuffer, data) -- new buffer | |
285 self.writebufferlen = total | |
286 if not self.eventwrite then -- register new write event | |
287 --vdebug( "register new write event" ) | |
288 self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ) | |
289 end | |
290 return true | |
291 end | |
292 function interface_mt:close() | |
293 if self.nointerface then return nil, "locked"; end | |
294 debug( "try to close client connection with id:", self.id ) | |
295 if self.type == "client" then | |
296 self.fatalerror = "client to close" | |
297 if self.eventwrite then -- wait for incomplete write request | |
298 self:_lock( true, true, false ) | |
299 debug "closing delayed until writebuffer is empty" | |
300 return nil, "writebuffer not empty, waiting" | |
301 else -- close now | |
302 self:_lock( true, true, true ) | |
303 self:_close() | |
304 return true | |
305 end | |
306 else | |
307 debug( "try to close server with id:", tostring(self.id)) | |
308 self.fatalerror = "server to close" | |
309 self:_lock( true ) | |
310 self:_close( 0 ) | |
311 return true | |
312 end | |
313 end | |
314 | |
315 function interface_mt:socket() | |
316 return self.conn | |
317 end | |
318 | |
319 function interface_mt:server() | |
320 return self._server or self; | |
321 end | |
322 | |
323 function interface_mt:port() | |
324 return self._port | |
325 end | |
326 | |
327 function interface_mt:serverport() | |
328 return self._serverport | |
329 end | |
330 | |
331 function interface_mt:ip() | |
332 return self._ip | |
333 end | |
334 | |
335 function interface_mt:ssl() | |
336 return self._usingssl | |
337 end | |
338 interface_mt.clientport = interface_mt.port -- COMPAT server_select | |
339 | |
340 function interface_mt:type() | |
341 return self._type or "client" | |
342 end | |
343 | |
344 function interface_mt:connections() | |
345 return self._connections | |
346 end | |
347 | |
348 function interface_mt:address() | |
349 return self.addr | |
350 end | |
351 | |
352 function interface_mt:set_sslctx(sslctx) | |
353 self._sslctx = sslctx; | |
354 if sslctx then | |
355 self.starttls = nil; -- use starttls() of interface_mt | |
356 else | |
357 self.starttls = false; -- prevent starttls() | |
358 end | |
359 end | |
360 | |
361 function interface_mt:set_mode(pattern) | |
362 if pattern then | |
363 self._pattern = pattern; | |
364 end | |
365 return self._pattern; | |
366 end | |
367 | |
368 function interface_mt:set_send(new_send) | |
369 -- No-op, we always use the underlying connection's send | |
370 end | |
371 | |
372 function interface_mt:starttls(sslctx, call_onconnect) | |
373 debug( "try to start ssl at client id:", self.id ) | |
374 local err | |
375 self._sslctx = sslctx; | |
376 if self._usingssl then -- startssl was already called | |
377 err = "ssl already active" | |
378 end | |
379 if err then | |
380 debug( "error:", err ) | |
381 return nil, err | |
382 end | |
383 self._usingssl = true | |
384 self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event | |
385 self.startsslcallback = nil | |
386 self:_start_ssl(call_onconnect); | |
387 self.eventstarthandshake = nil | |
388 return -1 | |
389 end | |
390 if not self.eventwrite then | |
391 self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake | |
392 self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake | |
393 else -- wait until writebuffer is empty | |
394 self:_lock( true, true, false ) | |
395 debug "ssl session delayed until writebuffer is empty..." | |
396 end | |
397 self.starttls = false; | |
398 return true | |
399 end | |
400 | |
401 function interface_mt:setoption(option, value) | |
402 if self.conn.setoption then | |
403 return self.conn:setoption(option, value); | |
404 end | |
405 return false, "setoption not implemented"; | |
406 end | |
407 | |
408 function interface_mt:setlistener(listener) | |
409 self:ondetach(); -- Notify listener that it is no longer responsible for this connection | |
410 self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, | |
411 self.onreadtimeout, self.onstatus, self.ondetach | |
412 = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout, | |
413 listener.onreadtimeout, listener.onstatus, listener.ondetach; | |
414 end | |
415 | |
416 -- Stub handlers | |
417 function interface_mt:onconnect() | |
418 end | |
419 function interface_mt:onincoming() | |
420 end | |
421 function interface_mt:ondisconnect() | |
422 end | |
423 function interface_mt:ontimeout() | |
424 end | |
425 function interface_mt:onreadtimeout() | |
426 self.fatalerror = "timeout during receiving" | |
427 debug( "connection failed:", self.fatalerror ) | |
428 self:_close() | |
429 self.eventread = nil | |
430 end | |
431 function interface_mt:ondrain() | |
432 end | |
433 function interface_mt:ondetach() | |
434 end | |
435 function interface_mt:onstatus() | |
436 end | |
437 | |
438 -- End of client interface methods | |
439 | |
440 local function handleclient( client, ip, port, server, pattern, listener, sslctx ) -- creates an client interface | |
441 --vdebug("creating client interfacce...") | |
442 local interface = { | |
443 type = "client"; | |
444 conn = client; | |
445 currenttime = socket_gettime( ); -- safe the origin | |
446 writebuffer = {}; -- writebuffer | |
447 writebufferlen = 0; -- length of writebuffer | |
448 send = client.send; -- caching table lookups | |
449 receive = client.receive; | |
450 onconnect = listener.onconnect; -- will be called when client disconnects | |
451 ondisconnect = listener.ondisconnect; -- will be called when client disconnects | |
452 onincoming = listener.onincoming; -- will be called when client sends data | |
453 ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs | |
454 onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs | |
455 ondrain = listener.ondrain; -- called when writebuffer is empty | |
456 ondetach = listener.ondetach; -- called when disassociating this listener from this connection | |
457 onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS) | |
458 eventread = false, eventwrite = false, eventclose = false, | |
459 eventhandshake = false, eventstarthandshake = false; -- event handler | |
460 eventconnect = false, eventsession = false; -- more event handler... | |
461 eventwritetimeout = false; -- even more event handler... | |
462 eventreadtimeout = false; | |
463 fatalerror = false; -- error message | |
464 writecallback = false; -- will be called on write events | |
465 readcallback = false; -- will be called on read events | |
466 nointerface = true; -- lock/unlock parameter of this interface | |
467 noreading = false, nowriting = false; -- locks of the read/writecallback | |
468 startsslcallback = false; -- starting handshake callback | |
469 position = false; -- position of client in interfacelist | |
470 | |
471 -- Properties | |
472 _ip = ip, _port = port, _server = server, _pattern = pattern, | |
473 _serverport = (server and server:port() or nil), | |
474 _sslctx = sslctx; -- parameters | |
475 _usingssl = false; -- client is using ssl; | |
476 } | |
477 if not has_luasec then interface.starttls = false; end | |
478 interface.id = tostring(interface):match("%x+$"); | |
479 interface.writecallback = function( event ) -- called on write events | |
480 --vdebug( "new client write event, id/ip/port:", interface, ip, port ) | |
481 if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event | |
482 --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror ) | |
483 interface.eventwrite = false | |
484 return -1 | |
485 end | |
486 if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect | |
487 interface.fatalerror = "timeout during writing" | |
488 debug( "writing failed:", interface.fatalerror ) | |
489 interface:_close() | |
490 interface.eventwrite = false | |
491 return -1 | |
492 else -- can write :) | |
493 if interface._usingssl then -- handle luasec | |
494 if interface.eventreadtimeout then -- we have to read first | |
495 local ret = interface.readcallback( ) -- call readcallback | |
496 --vdebug( "tried to read in writecallback, result:", ret ) | |
497 end | |
498 if interface.eventwritetimeout then -- luasec only | |
499 interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error | |
500 interface.eventwritetimeout = false | |
501 end | |
502 end | |
503 interface.writebuffer = { t_concat(interface.writebuffer) } | |
504 local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen ) | |
505 --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte ) | |
506 if succ then -- writing succesful | |
507 interface.writebuffer[1] = nil | |
508 interface.writebufferlen = 0 | |
509 interface:ondrain(); | |
510 if interface.fatalerror then | |
511 debug "closing client after writing" | |
512 interface:_close() -- close interface if needed | |
513 elseif interface.startsslcallback then -- start ssl connection if needed | |
514 debug "starting ssl handshake after writing" | |
515 interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 ) | |
516 elseif interface.eventreadtimeout then | |
517 return EV_WRITE, EV_TIMEOUT | |
518 end | |
519 interface.eventwrite = nil | |
520 return -1 | |
521 elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again | |
522 --vdebug( "writebuffer is not empty:", err ) | |
523 interface.writebuffer[1] = s_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer | |
524 interface.writebufferlen = interface.writebufferlen - byte | |
525 if "wantread" == err then -- happens only with luasec | |
526 local callback = function( ) | |
527 interface:_close() | |
528 interface.eventwritetimeout = nil | |
529 return -1; | |
530 end | |
531 interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event | |
532 debug( "wantread during write attempt, reg it in readcallback but dont know what really happens next..." ) | |
533 -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent | |
548 return -1 | 534 return -1 |
549 end | 535 end |
550 end | 536 return EV_WRITE, cfg.WRITE_TIMEOUT |
551 end | 537 else -- connection was closed during writing or fatal error |
552 | 538 interface.fatalerror = err or "fatal error" |
553 interface.readcallback = function( event ) -- called on read events | 539 debug( "connection failed in write event:", interface.fatalerror ) |
554 --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) | 540 interface:_close() |
555 if interface.noreading or interface.fatalerror then -- leave this event | 541 interface.eventwrite = nil |
556 --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) ) | |
557 interface.eventread = nil | |
558 return -1 | 542 return -1 |
559 end | 543 end |
560 if EV_TIMEOUT == event and interface:onreadtimeout() ~= true then | 544 end |
561 return -1 -- took too long to get some data from client -> disconnect | 545 end |
562 end | 546 |
563 if interface._usingssl then -- handle luasec | 547 interface.readcallback = function( event ) -- called on read events |
564 if interface.eventwritetimeout then -- ok, in the past writecallback was regged | 548 --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) |
565 local ret = interface.writecallback( ) -- call it | 549 if interface.noreading or interface.fatalerror then -- leave this event |
566 --vdebug( "tried to write in readcallback, result:", tostring(ret) ) | 550 --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) ) |
551 interface.eventread = nil | |
552 return -1 | |
553 end | |
554 if EV_TIMEOUT == event and interface:onreadtimeout() ~= true then | |
555 return -1 -- took too long to get some data from client -> disconnect | |
556 end | |
557 if interface._usingssl then -- handle luasec | |
558 if interface.eventwritetimeout then -- ok, in the past writecallback was regged | |
559 local ret = interface.writecallback( ) -- call it | |
560 --vdebug( "tried to write in readcallback, result:", tostring(ret) ) | |
561 end | |
562 if interface.eventreadtimeout then | |
563 interface.eventreadtimeout:close( ) | |
564 interface.eventreadtimeout = nil | |
565 end | |
566 end | |
567 local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" | |
568 --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) | |
569 buffer = buffer or part | |
570 if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length | |
571 interface.fatalerror = "receive buffer exceeded" | |
572 debug( "fatal error:", interface.fatalerror ) | |
573 interface:_close() | |
574 interface.eventread = nil | |
575 return -1 | |
576 end | |
577 if err and ( err ~= "timeout" and err ~= "wantread" ) then | |
578 if "wantwrite" == err then -- need to read on write event | |
579 if not interface.eventwrite then -- register new write event if needed | |
580 interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) | |
567 end | 581 end |
568 if interface.eventreadtimeout then | 582 interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, |
569 interface.eventreadtimeout:close( ) | 583 function( ) |
570 interface.eventreadtimeout = nil | 584 interface:_close() |
571 end | 585 end, cfg.READ_TIMEOUT |
572 end | 586 ) |
573 local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" | 587 debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." ) |
574 --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) | 588 -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... |
575 buffer = buffer or part | 589 else -- connection was closed or fatal error |
576 if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length | 590 interface.fatalerror = err |
577 interface.fatalerror = "receive buffer exceeded" | 591 debug( "connection failed in read event:", interface.fatalerror ) |
578 debug( "fatal error:", interface.fatalerror ) | |
579 interface:_close() | 592 interface:_close() |
580 interface.eventread = nil | 593 interface.eventread = nil |
581 return -1 | 594 return -1 |
582 end | 595 end |
583 if err and ( err ~= "timeout" and err ~= "wantread" ) then | 596 else |
584 if "wantwrite" == err then -- need to read on write event | 597 interface.onincoming( interface, buffer, err ) -- send new data to listener |
585 if not interface.eventwrite then -- register new write event if needed | 598 end |
586 interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) | 599 if interface.noreading then |
587 end | 600 interface.eventread = nil; |
588 interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, | 601 return -1; |
589 function( ) | 602 end |
590 interface:_close() | 603 return EV_READ, cfg.READ_TIMEOUT |
591 end, cfg.READ_TIMEOUT | 604 end |
592 ) | 605 |
593 debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." ) | 606 client:settimeout( 0 ) -- set non blocking |
594 -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... | 607 setmetatable(interface, interface_mt) |
595 else -- connection was closed or fatal error | 608 interfacelist[ interface ] = true -- add to interfacelist |
596 interface.fatalerror = err | 609 return interface |
597 debug( "connection failed in read event:", interface.fatalerror ) | 610 end |
598 interface:_close() | 611 |
599 interface.eventread = nil | 612 local function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface |
600 return -1 | 613 debug "creating server interface..." |
601 end | 614 local interface = { |
615 _connections = 0; | |
616 | |
617 conn = server; | |
618 onconnect = listener.onconnect; -- will be called when new client connected | |
619 eventread = false; -- read event handler | |
620 eventclose = false; -- close event handler | |
621 readcallback = false; -- read event callback | |
622 fatalerror = false; -- error message | |
623 nointerface = true; -- lock/unlock parameter | |
624 | |
625 _ip = addr, _port = port, _pattern = pattern, | |
626 _sslctx = sslctx; | |
627 } | |
628 interface.id = tostring(interface):match("%x+$"); | |
629 interface.readcallback = function( event ) -- server handler, called on incoming connections | |
630 --vdebug( "server can accept, id/addr/port:", interface, addr, port ) | |
631 if interface.fatalerror then | |
632 --vdebug( "leaving this event because:", self.fatalerror ) | |
633 interface.eventread = nil | |
634 return -1 | |
635 end | |
636 local delay = cfg.ACCEPT_DELAY | |
637 if EV_TIMEOUT == event then | |
638 if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count | |
639 debug( "to many connections, seconds to wait for next accept:", delay ) | |
640 return EV_TIMEOUT, delay -- timeout... | |
602 else | 641 else |
603 interface.onincoming( interface, buffer, err ) -- send new data to listener | 642 return EV_READ -- accept again |
604 end | 643 end |
605 if interface.noreading then | 644 end |
606 interface.eventread = nil; | 645 --vdebug("max connection check ok, accepting...") |
607 return -1; | 646 local client, err = server:accept() -- try to accept; TODO: check err |
608 end | 647 while client do |
609 return EV_READ, cfg.READ_TIMEOUT | 648 if interface._connections >= cfg.MAX_CONNECTIONS then |
610 end | 649 client:close( ) -- refuse connection |
611 | 650 debug( "maximal connections reached, refuse client connection; accept delay:", delay ) |
612 client:settimeout( 0 ) -- set non blocking | 651 return EV_TIMEOUT, delay -- delay for next accept attempt |
613 setmetatable(interface, interface_mt) | 652 end |
614 interfacelist[ interface ] = true -- add to interfacelist | 653 local client_ip, client_port = client:getpeername( ) |
615 return interface | 654 interface._connections = interface._connections + 1 -- increase connection count |
616 end | 655 local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx ) |
617 end | 656 --vdebug( "client id:", clientinterface, "startssl:", startssl ) |
618 | 657 if has_luasec and sslctx then |
619 local handleserver | 658 clientinterface:starttls(sslctx, true) |
620 do | |
621 function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface | |
622 debug "creating server interface..." | |
623 local interface = { | |
624 _connections = 0; | |
625 | |
626 conn = server; | |
627 onconnect = listener.onconnect; -- will be called when new client connected | |
628 eventread = false; -- read event handler | |
629 eventclose = false; -- close event handler | |
630 readcallback = false; -- read event callback | |
631 fatalerror = false; -- error message | |
632 nointerface = true; -- lock/unlock parameter | |
633 | |
634 _ip = addr, _port = port, _pattern = pattern, | |
635 _sslctx = sslctx; | |
636 } | |
637 interface.id = tostring(interface):match("%x+$"); | |
638 interface.readcallback = function( event ) -- server handler, called on incoming connections | |
639 --vdebug( "server can accept, id/addr/port:", interface, addr, port ) | |
640 if interface.fatalerror then | |
641 --vdebug( "leaving this event because:", self.fatalerror ) | |
642 interface.eventread = nil | |
643 return -1 | |
644 end | |
645 local delay = cfg.ACCEPT_DELAY | |
646 if EV_TIMEOUT == event then | |
647 if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count | |
648 debug( "to many connections, seconds to wait for next accept:", delay ) | |
649 return EV_TIMEOUT, delay -- timeout... | |
650 else | |
651 return EV_READ -- accept again | |
652 end | |
653 end | |
654 --vdebug("max connection check ok, accepting...") | |
655 local client, err = server:accept() -- try to accept; TODO: check err | |
656 while client do | |
657 if interface._connections >= cfg.MAX_CONNECTIONS then | |
658 client:close( ) -- refuse connection | |
659 debug( "maximal connections reached, refuse client connection; accept delay:", delay ) | |
660 return EV_TIMEOUT, delay -- delay for next accept attempt | |
661 end | |
662 local client_ip, client_port = client:getpeername( ) | |
663 interface._connections = interface._connections + 1 -- increase connection count | |
664 local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx ) | |
665 --vdebug( "client id:", clientinterface, "startssl:", startssl ) | |
666 if has_luasec and sslctx then | |
667 clientinterface:starttls(sslctx, true) | |
668 else | |
669 clientinterface:_start_session( true ) | |
670 end | |
671 debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>"); | |
672 | |
673 client, err = server:accept() -- try to accept again | |
674 end | |
675 return EV_READ | |
676 end | |
677 | |
678 server:settimeout( 0 ) | |
679 setmetatable(interface, interface_mt) | |
680 interfacelist[ interface ] = true | |
681 interface:_start_session() | |
682 return interface | |
683 end | |
684 end | |
685 | |
686 local addserver = ( function( ) | |
687 return function( addr, port, listener, pattern, sslctx, startssl ) -- TODO: check arguments | |
688 --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslctx or "nil", startssl or "nil") | |
689 if sslctx and not has_luasec then | |
690 debug "fatal error: luasec not found" | |
691 return nil, "luasec not found" | |
692 end | |
693 local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket | |
694 if not server then | |
695 debug( "creating server socket on "..addr.." port "..port.." failed:", err ) | |
696 return nil, err | |
697 end | |
698 local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler | |
699 debug( "new server created with id:", tostring(interface)) | |
700 return interface | |
701 end | |
702 end )( ) | |
703 | |
704 local addclient, wrapclient | |
705 do | |
706 function wrapclient( client, ip, port, listeners, pattern, sslctx ) | |
707 local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) | |
708 interface:_start_connection(sslctx) | |
709 return interface, client | |
710 --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface | |
711 end | |
712 | |
713 function addclient( addr, serverport, listener, pattern, sslctx, typ ) | |
714 if sslctx and not has_luasec then | |
715 debug "need luasec, but not available" | |
716 return nil, "luasec not found" | |
717 end | |
718 if not typ then | |
719 local addrinfo, err = getaddrinfo(addr) | |
720 if not addrinfo then return nil, err end | |
721 if addrinfo[1] and addrinfo[1].family == "inet6" then | |
722 typ = "tcp6" | |
723 else | 659 else |
724 typ = "tcp" | 660 clientinterface:_start_session( true ) |
725 end | 661 end |
726 end | 662 debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>"); |
727 local create = socket[typ] | 663 |
728 if type( create ) ~= "function" then | 664 client, err = server:accept() -- try to accept again |
729 return nil, "invalid socket type" | 665 end |
730 end | 666 return EV_READ |
731 local client, err = create() -- creating new socket | 667 end |
732 if not client then | 668 |
733 debug( "cannot create socket:", err ) | 669 server:settimeout( 0 ) |
734 return nil, err | 670 setmetatable(interface, interface_mt) |
735 end | 671 interfacelist[ interface ] = true |
736 client:settimeout( 0 ) -- set nonblocking | 672 interface:_start_session() |
737 local res, err = client:connect( addr, serverport ) -- connect | 673 return interface |
738 if res or ( err == "timeout" ) then | 674 end |
739 local ip, port = client:getsockname( ) | 675 |
740 local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx ) | 676 local function addserver( addr, port, listener, pattern, sslctx, startssl ) -- TODO: check arguments |
741 interface:_start_connection( startssl ) | 677 --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslctx or "nil", startssl or "nil") |
742 debug( "new connection id:", interface.id ) | 678 if sslctx and not has_luasec then |
743 return interface, err | 679 debug "fatal error: luasec not found" |
680 return nil, "luasec not found" | |
681 end | |
682 local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket | |
683 if not server then | |
684 debug( "creating server socket on "..addr.." port "..port.." failed:", err ) | |
685 return nil, err | |
686 end | |
687 local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler | |
688 debug( "new server created with id:", tostring(interface)) | |
689 return interface | |
690 end | |
691 | |
692 local function wrapclient( client, ip, port, listeners, pattern, sslctx ) | |
693 local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) | |
694 interface:_start_connection(sslctx) | |
695 return interface, client | |
696 --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface | |
697 end | |
698 | |
699 local function addclient( addr, serverport, listener, pattern, sslctx, typ ) | |
700 if sslctx and not has_luasec then | |
701 debug "need luasec, but not available" | |
702 return nil, "luasec not found" | |
703 end | |
704 if not typ then | |
705 local addrinfo, err = getaddrinfo(addr) | |
706 if not addrinfo then return nil, err end | |
707 if addrinfo[1] and addrinfo[1].family == "inet6" then | |
708 typ = "tcp6" | |
744 else | 709 else |
745 debug( "new connection failed:", err ) | 710 typ = "tcp" |
746 return nil, err | 711 end |
747 end | 712 end |
748 end | 713 local create = socket[typ] |
749 end | 714 if type( create ) ~= "function" then |
750 | 715 return nil, "invalid socket type" |
751 | 716 end |
752 local loop = function( ) -- starts the event loop | 717 local client, err = create() -- creating new socket |
718 if not client then | |
719 debug( "cannot create socket:", err ) | |
720 return nil, err | |
721 end | |
722 client:settimeout( 0 ) -- set nonblocking | |
723 local res, err = client:connect( addr, serverport ) -- connect | |
724 if res or ( err == "timeout" ) then | |
725 local ip, port = client:getsockname( ) | |
726 local interface = wrapclient( client, ip, serverport, listener, pattern, sslctx ) | |
727 interface:_start_connection( startssl ) | |
728 debug( "new connection id:", interface.id ) | |
729 return interface, err | |
730 else | |
731 debug( "new connection failed:", err ) | |
732 return nil, err | |
733 end | |
734 end | |
735 | |
736 local function loop( ) -- starts the event loop | |
753 base:loop( ) | 737 base:loop( ) |
754 return "quitting"; | 738 return "quitting"; |
755 end | 739 end |
756 | 740 |
757 local function newevent( ... ) | 741 local function newevent( ... ) |
758 return addevent( base, ... ) | 742 return addevent( base, ... ) |
759 end | 743 end |
760 | 744 |
761 local closeallservers = function( arg ) | 745 local function closeallservers ( arg ) |
762 for item in pairs( interfacelist ) do | 746 for item in pairs( interfacelist ) do |
763 if item.type == "server" then | 747 if item.type == "server" then |
764 item:close( arg ) | 748 item:close( arg ) |
765 end | 749 end |
766 end | 750 end |
812 end | 796 end |
813 sender:set_mode("*a"); | 797 sender:set_mode("*a"); |
814 end | 798 end |
815 | 799 |
816 return { | 800 return { |
817 | |
818 cfg = cfg, | 801 cfg = cfg, |
819 base = base, | 802 base = base, |
820 loop = loop, | 803 loop = loop, |
821 link = link, | 804 link = link, |
822 event = event, | 805 event = event, |