Software /
code /
prosody
Comparison
net/server_event.lua @ 2091:6fd686a45806
net.server_event: Initial commit of server_event.lua. Don't get too excited, it's not used at all yet, and is still incomplete :)
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Fri, 20 Nov 2009 22:58:56 +0000 |
child | 2092:28c0b313b610 |
comparison
equal
deleted
inserted
replaced
2090:7810648ea26d | 2091:6fd686a45806 |
---|---|
1 --[[ | |
2 | |
3 | |
4 server.lua based on lua/libevent by blastbeat | |
5 | |
6 notes: | |
7 -- when using luaevent, never register 2 or more EV_READ at one socket, same for EV_WRITE | |
8 -- you cant even register a new EV_READ/EV_WRITE callback inside another one | |
9 -- never call eventcallback:close( ) from inside eventcallback | |
10 -- to do some of the above, use timeout events or something what will called from outside | |
11 -- dont let garbagecollect eventcallbacks, as long they are running | |
12 -- when using luasec, there are 4 cases of timeout errors: wantread or wantwrite during reading or writing | |
13 | |
14 --]] | |
15 | |
16 | |
17 local SCRIPT_NAME = "server_event.lua" | |
18 local SCRIPT_VERSION = "0.05" | |
19 local SCRIPT_AUTHOR = "blastbeat" | |
20 local LAST_MODIFIED = "2009/11/20" | |
21 | |
22 local cfg = { | |
23 MAX_CONNECTIONS = 100000, -- max per server connections (use "ulimit -n" on *nix) | |
24 MAX_HANDSHAKE_ATTEMPS = 10, -- attemps to finish ssl handshake | |
25 HANDSHAKE_TIMEOUT = 1, -- timout in seconds per handshake attemp | |
26 MAX_READ_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes allowed to read from sockets | |
27 MAX_SEND_LENGTH = 1024 * 1024 * 1024 * 1024, -- max bytes size of write buffer (for writing on sockets) | |
28 ACCEPT_DELAY = 10, -- seconds to wait until the next attemp of a full server to accept | |
29 READ_TIMEOUT = 60 * 30, -- timeout in seconds for read data from socket | |
30 WRITE_TIMEOUT = 30, -- timeout in seconds for write data on socket | |
31 CONNECT_TIMEOUT = 10, -- timeout in seconds for connection attemps | |
32 CLEAR_DELAY = 5, -- seconds to wait for clearing interface list (and calling ondisconnect listeners) | |
33 DEBUG = true, -- show debug messages | |
34 } | |
35 | |
36 local function use(x) return rawget(_G, x); end | |
37 local print = use "print" | |
38 local pcall = use "pcall" | |
39 local ipairs = use "ipairs" | |
40 local string = use "string" | |
41 local select = use "select" | |
42 local require = use "require" | |
43 local tostring = use "tostring" | |
44 local coroutine = use "coroutine" | |
45 local setmetatable = use "setmetatable" | |
46 | |
47 local ssl = use "ssl" | |
48 local socket = use "socket" | |
49 | |
50 local log = require ("util.logger").init("socket") | |
51 | |
52 local function debug(...) | |
53 return log("debug", ("%s "):rep(select('#', ...)), ...) | |
54 end | |
55 | |
56 local bitor = ( function( ) -- thx Rici Lake | |
57 local hasbit = function( x, p ) | |
58 return x % ( p + p ) >= p | |
59 end | |
60 return function( x, y ) | |
61 local p = 1 | |
62 local z = 0 | |
63 local limit = x > y and x or y | |
64 while p <= limit do | |
65 if hasbit( x, p ) or hasbit( y, p ) then | |
66 z = z + p | |
67 end | |
68 p = p + p | |
69 end | |
70 return z | |
71 end | |
72 end )( ) | |
73 | |
74 local getid = function( ) | |
75 return function( ) | |
76 end | |
77 end | |
78 | |
79 local event = require "luaevent.core" | |
80 local base = event.new( ) | |
81 local EV_READ = event.EV_READ | |
82 local EV_WRITE = event.EV_WRITE | |
83 local EV_TIMEOUT = event.EV_TIMEOUT | |
84 | |
85 local EV_READWRITE = bitor( EV_READ, EV_WRITE ) | |
86 | |
87 local interfacelist = ( function( ) -- holds the interfaces for sockets | |
88 local array = { } | |
89 local len = 0 | |
90 return function( method, arg ) | |
91 if "add" == method then | |
92 len = len + 1 | |
93 array[ len ] = arg | |
94 arg:_position( len ) | |
95 return len | |
96 elseif "delete" == method then | |
97 if len <= 0 then | |
98 return nil, "array is already empty" | |
99 end | |
100 local position = arg:_position() -- get position in array | |
101 if position ~= len then | |
102 local interface = array[ len ] -- get last interface | |
103 array[ position ] = interface -- copy it into free position | |
104 array[ len ] = nil -- free last position | |
105 interface:_position( position ) -- set new position in array | |
106 else -- free last position | |
107 array[ len ] = nil | |
108 end | |
109 len = len - 1 | |
110 return len | |
111 else | |
112 return array | |
113 end | |
114 end | |
115 end )( ) | |
116 | |
117 -- Client interface methods | |
118 local interface_mt | |
119 do | |
120 interface_mt = {}; interface_mt.__index = interface_mt; | |
121 | |
122 local addevent = base.addevent | |
123 local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield | |
124 local string_len = string.len | |
125 | |
126 -- Private methods | |
127 function interface_mt:_position(new_position) | |
128 self.position = new_position or self.position | |
129 return self.position; | |
130 end | |
131 function interface_mt:_close() -- regs event to start self:_destroy() | |
132 local callback = function( ) | |
133 self:_destroy(); | |
134 self.eventclose = nil | |
135 return -1 | |
136 end | |
137 self.eventclose = addevent( base, nil, EV_TIMEOUT, callback, 0 ) | |
138 return true | |
139 end | |
140 | |
141 function interface_mt:_start_connection(plainssl) -- should be called from addclient | |
142 local callback = function( event ) | |
143 if EV_TIMEOUT == event then -- timout during connection | |
144 self.fatalerror = "connection timeout" | |
145 self.listener.ontimeout( self ) -- call timeout listener | |
146 self:_close() | |
147 debug( "new connection failed. id:", self, "error:", self.fatalerror ) | |
148 else | |
149 if plainssl then -- start ssl session | |
150 self:_start_ssl( self.listener.onconnect ) | |
151 else -- normal connection | |
152 self:_start_session( self.listener.onconnect ) | |
153 end | |
154 debug( "new connection established. id:", self ) | |
155 end | |
156 self.eventconnect = nil | |
157 return -1 | |
158 end | |
159 self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT ) | |
160 return true | |
161 end | |
162 function interface_mt:_start_session(onconnect) -- new session, for example after startssl | |
163 if self.type == "client" then | |
164 local callback = function( ) | |
165 self:_lock( false, false, false ) | |
166 --vdebug( "start listening on client socket with id:", self ) | |
167 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ) -- register callback | |
168 onconnect( self ) | |
169 self.eventsession = nil | |
170 return -1 | |
171 end | |
172 self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 ) | |
173 else | |
174 self:_lock( false ) | |
175 --vdebug( "start listening on server socket with id:", self ) | |
176 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback | |
177 end | |
178 return true | |
179 end | |
180 function interface_mt:_start_ssl(arg) -- old socket will be destroyed, therefore we have to close read/write events first | |
181 --vdebug( "starting ssl session with client id:", self ) | |
182 local _ | |
183 _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! | |
184 _ = self.eventwrite and self.eventwrite:close( ) | |
185 self.eventread, self.eventwrite = nil, nil | |
186 local err | |
187 self.conn, err = ssl.wrap( self.conn, self.sslctx ) | |
188 if err then | |
189 self.fatalerror = err | |
190 self.conn = nil -- cannot be used anymore | |
191 if "onconnect" == arg then | |
192 self.ondisconnect = nil -- dont call this when client isnt really connected | |
193 end | |
194 self:_close() | |
195 debug( "fatal error while ssl wrapping:", err ) | |
196 return false | |
197 end | |
198 self.conn:settimeout( 0 ) -- set non blocking | |
199 local handshakecallback = coroutine_wrap( | |
200 function( event ) | |
201 local _, err | |
202 local attempt = 0 | |
203 local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPS | |
204 while attempt < 1000 do -- no endless loop | |
205 attempt = attempt + 1 | |
206 debug( "ssl handshake of client with id:", self, "attemp:", attempt ) | |
207 if attempt > maxattempt then | |
208 self.fatalerror = "max handshake attemps exceeded" | |
209 elseif EV_TIMEOUT == event then | |
210 self.fatalerror = "timeout during handshake" | |
211 else | |
212 _, err = self.conn:dohandshake( ) | |
213 if not err then | |
214 self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed | |
215 self.send = self.conn.send -- caching table lookups with new client object | |
216 self.receive = self.conn.receive | |
217 local onsomething | |
218 if "onconnect" == arg then -- trigger listener | |
219 onsomething = self.listener.onconnect | |
220 else | |
221 onsomething = self.listener.onsslconnection | |
222 end | |
223 self:_start_session( onsomething ) | |
224 debug( "ssl handshake done" ) | |
225 self.eventhandshake = nil | |
226 return -1 | |
227 end | |
228 debug( "error during ssl handshake:", err ) | |
229 if err == "wantwrite" then | |
230 event = EV_WRITE | |
231 elseif err == "wantread" then | |
232 event = EV_READ | |
233 else | |
234 self.fatalerror = err | |
235 end | |
236 end | |
237 if self.fatalerror then | |
238 if "onconnect" == arg then | |
239 self.ondisconnect = nil -- dont call this when client isnt really connected | |
240 end | |
241 self:_close() | |
242 debug( "handshake failed because:", self.fatalerror ) | |
243 self.eventhandshake = nil | |
244 return -1 | |
245 end | |
246 event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster... | |
247 end | |
248 end | |
249 ) | |
250 debug "starting handshake..." | |
251 self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked | |
252 self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT ) | |
253 return true | |
254 end | |
255 function interface_mt:_destroy() -- close this interface + events and call last listener | |
256 debug( "closing client with id:", self ) | |
257 self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions | |
258 local _ | |
259 _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! | |
260 if self.type == "client" then | |
261 _ = self.eventwrite and self.eventwrite:close( ) | |
262 _ = self.eventhandshake and self.eventhandshake:close( ) | |
263 _ = self.eventstarthandshake and self.eventstarthandshake:close( ) | |
264 _ = self.eventconnect and self.eventconnect:close( ) | |
265 _ = self.eventsession and self.eventsession:close( ) | |
266 _ = self.eventwritetimeout and self.eventwritetimeout:close( ) | |
267 _ = self.eventreadtimeout and self.eventreadtimeout:close( ) | |
268 _ = self.ondisconnect and self:ondisconnect( self.fatalerror ) -- call ondisconnect listener (wont be the case if handshake failed on connect) | |
269 _ = self.conn and self.conn:close( ) -- close connection, must also be called outside of any socket registered events! | |
270 self._server:counter(-1); | |
271 self.eventread, self.eventwrite = nil, nil | |
272 self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil | |
273 self.readcallback, self.writecallback = nil, nil | |
274 else | |
275 self.conn:close( ) | |
276 self.eventread, self.eventclose = nil, nil | |
277 self.interface, self.readcallback = nil, nil | |
278 end | |
279 interfacelist( "delete", self ) | |
280 return true | |
281 end | |
282 function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events | |
283 self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting | |
284 return nointerface, noreading, nowriting | |
285 end | |
286 | |
287 function interface_mt:counter(c) | |
288 if c then | |
289 self._connections = self._connections - c | |
290 end | |
291 return self._connections | |
292 end | |
293 | |
294 -- Public methods | |
295 function interface_mt:write(data) | |
296 --vdebug( "try to send data to client, id/data:", self, data ) | |
297 data = tostring( data ) | |
298 local len = string_len( data ) | |
299 local total = len + self.writebufferlen | |
300 if total > cfg.MAX_SEND_LENGTH then -- check buffer length | |
301 local err = "send buffer exceeded" | |
302 debug( "error:", err ) -- to much, check your app | |
303 return nil, err | |
304 end | |
305 self.writebuffer = self.writebuffer .. data -- new buffer | |
306 self.writebufferlen = total | |
307 if not self.eventwrite then -- register new write event | |
308 --vdebug( "register new write event" ) | |
309 self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ) | |
310 end | |
311 return true | |
312 end | |
313 function interface_mt:close(now) | |
314 debug( "try to close client connection with id:", self ) | |
315 if self.type == "client" then | |
316 self.fatalerror = "client to close" | |
317 if ( not self.eventwrite ) or now then -- try to close immediately | |
318 self:_lock( true, true, true ) | |
319 self:_close() | |
320 return true | |
321 else -- wait for incomplete write request | |
322 self:_lock( true, true, false ) | |
323 debug "closing delayed until writebuffer is empty" | |
324 return nil, "writebuffer not empty, waiting" | |
325 end | |
326 else | |
327 debug( "try to close server with id:", self, "args:", now ) | |
328 self.fatalerror = "server to close" | |
329 self:_lock( true ) | |
330 local count = 0 | |
331 for _, item in ipairs( interfacelist( ) ) do | |
332 if ( item.type ~= "server" ) and ( item._server == self ) then -- client/server match | |
333 if item:close( now ) then -- writebuffer was empty | |
334 count = count + 1 | |
335 end | |
336 end | |
337 end | |
338 local timeout = 0 -- dont wait for unfinished writebuffers of clients... | |
339 if not now then | |
340 timeout = cfg.WRITE_TIMEOUT -- ...or wait for it | |
341 end | |
342 self:_close( timeout ) -- add new event to remove the server interface | |
343 debug( "seconds remained until server is closed:", timeout ) | |
344 return count -- returns finished clients with empty writebuffer | |
345 end | |
346 end | |
347 | |
348 function interface_mt:server() | |
349 return self._server or self; | |
350 end | |
351 | |
352 function interface_mt:port() | |
353 return self._port | |
354 end | |
355 | |
356 function interface_mt:ip() | |
357 return self._ip | |
358 end | |
359 | |
360 function interface_mt:ssl() | |
361 return self.usingssl | |
362 end | |
363 | |
364 function interface_mt:type() | |
365 return self._type or "client" | |
366 end | |
367 | |
368 function interface_mt:connections() | |
369 return self._connections | |
370 end | |
371 | |
372 function interface_mt:address() | |
373 return self.addr | |
374 end | |
375 | |
376 | |
377 | |
378 function interface_mt:starttls() | |
379 debug( "try to start ssl at client id:", self ) | |
380 local err | |
381 if not self.sslctx then -- no ssl available | |
382 err = "no ssl context available" | |
383 elseif self.usingssl then -- startssl was already called | |
384 err = "ssl already active" | |
385 end | |
386 if err then | |
387 debug( "error:", err ) | |
388 return nil, err | |
389 end | |
390 self.usingssl = true | |
391 self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event | |
392 self:_start_ssl(); | |
393 self.eventstarthandshake = nil | |
394 return -1 | |
395 end | |
396 if not self.eventwrite then | |
397 self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake | |
398 self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake | |
399 else -- wait until writebuffer is empty | |
400 self:_lock( true, true, false ) | |
401 debug "ssl session delayed until writebuffer is empty..." | |
402 end | |
403 return true | |
404 end | |
405 | |
406 function interface_mt.onconnect() | |
407 end | |
408 end | |
409 | |
410 -- End of client interface methods | |
411 | |
412 local handleclient; | |
413 do | |
414 local string_sub = string.sub -- caching table lookups | |
415 local string_len = string.len | |
416 local addevent = base.addevent | |
417 local coroutine_wrap = coroutine.wrap | |
418 local socket_gettime = socket.gettime | |
419 local coroutine_yield = coroutine.yield | |
420 function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface | |
421 --vdebug("creating client interfacce...") | |
422 local interface = { | |
423 type = "client"; | |
424 conn = client; | |
425 currenttime = socket_gettime( ); -- safe the origin | |
426 writebuffer = ""; -- writebuffer | |
427 writebufferlen = 0; -- length of writebuffer | |
428 send = client.send; -- caching table lookups | |
429 receive = client.receive; | |
430 onconnect = listener.onconnect; -- will be called when client disconnects | |
431 ondisconnect = listener.ondisconnect; -- will be called when client disconnects | |
432 onincoming = listener.onincoming; -- will be called when client sends data | |
433 eventread = false, eventwrite = false, eventclose = false, | |
434 eventhandshake = false, eventstarthandshake = false; -- event handler | |
435 eventconnect = false, eventsession = false; -- more event handler... | |
436 eventwritetimeout = false; -- even more event handler... | |
437 eventreadtimeout = false; | |
438 fatalerror = false; -- error message | |
439 writecallback = false; -- will be called on write events | |
440 readcallback = false; -- will be called on read events | |
441 nointerface = true; -- lock/unlock parameter of this interface | |
442 noreading = false, nowriting = false; -- locks of the read/writecallback | |
443 startsslcallback = false; -- starting handshake callback | |
444 position = false; -- position of client in interfacelist | |
445 | |
446 -- Properties | |
447 _ip = ip, _port = port, _server = server, _pattern = pattern, | |
448 _sslctx = sslctx; -- parameters | |
449 _usingssl = false; -- client is using ssl; | |
450 } | |
451 interface.writecallback = function( event ) -- called on write events | |
452 --vdebug( "new client write event, id/ip/port:", interface, ip, port ) | |
453 if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event | |
454 --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror ) | |
455 interface.eventwrite = false | |
456 return -1 | |
457 end | |
458 if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect | |
459 interface.fatalerror = "timeout during writing" | |
460 debug( "writing failed:", interface.fatalerror ) | |
461 interface:_close() | |
462 interface.eventwrite = false | |
463 return -1 | |
464 else -- can write :) | |
465 if interface.usingssl then -- handle luasec | |
466 if interface.eventreadtimeout then -- we have to read first | |
467 local ret = interface.readcallback( ) -- call readcallback | |
468 --vdebug( "tried to read in writecallback, result:", ret ) | |
469 end | |
470 if interface.eventwritetimeout then -- luasec only | |
471 interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error | |
472 interface.eventwritetimeout = false | |
473 end | |
474 end | |
475 local succ, err, byte = interface.send( interface.conn, interface.writebuffer, 1, interface.writebufferlen ) | |
476 --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte ) | |
477 if succ then -- writing succesful | |
478 interface.writebuffer = "" | |
479 interface.writebufferlen = 0 | |
480 if interface.fatalerror then | |
481 debug "closing client after writing" | |
482 interface:_close() -- close interface if needed | |
483 elseif interface.startsslcallback then -- start ssl connection if needed | |
484 debug "starting ssl handshake after writing" | |
485 interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 ) | |
486 elseif interface.eventreadtimeout then | |
487 return EV_WRITE, EV_TIMEOUT | |
488 end | |
489 interface.eventwrite = nil | |
490 return -1 | |
491 elseif byte then -- want write again | |
492 --vdebug( "writebuffer is not empty:", err ) | |
493 interface.writebuffer = string_sub( interface.writebuffer, byte + 1, interface.writebufferlen ) -- new buffer | |
494 interface.writebufferlen = interface.writebufferlen - byte | |
495 if "wantread" == err then -- happens only with luasec | |
496 local callback = function( ) | |
497 interface:_close() | |
498 interface.eventwritetimeout = nil | |
499 return evreturn, evtimeout | |
500 end | |
501 interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event | |
502 debug( "wantread during write attemp, reg it in readcallback but dont know what really happens next..." ) | |
503 -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent | |
504 return -1 | |
505 end | |
506 return EV_WRITE, cfg.WRITE_TIMEOUT | |
507 else -- connection was closed during writing or fatal error | |
508 interface.fatalerror = err or "fatal error" | |
509 debug( "connection failed in write event:", interface.fatalerror ) | |
510 interface:_close() | |
511 interface.eventwrite = nil | |
512 return -1 | |
513 end | |
514 end | |
515 end | |
516 local usingssl, receive = interface._usingssl, interface.receive; | |
517 interface.readcallback = function( event ) -- called on read events | |
518 --vdebug( "new client read event, id/ip/port:", interface, ip, port ) | |
519 if interface.noreading or interface.fatalerror then -- leave this event | |
520 --vdebug( "leaving this event because:", interface.noreading or interface.fatalerror ) | |
521 interface.eventread = nil | |
522 return -1 | |
523 end | |
524 if EV_TIMEOUT == event then -- took too long to get some data from client -> disconnect | |
525 interface.fatalerror = "timeout during receiving" | |
526 debug( "connection failed:", interface.fatalerror ) | |
527 interface:_close() | |
528 interface.eventread = nil | |
529 return -1 | |
530 else -- can read | |
531 if usingssl then -- handle luasec | |
532 if interface.eventwritetimeout then -- ok, in the past writecallback was regged | |
533 local ret = interface.writecallback( ) -- call it | |
534 --vdebug( "tried to write in readcallback, result:", ret ) | |
535 end | |
536 if interface.eventreadtimeout then | |
537 interface.eventreadtimeout:close( ) | |
538 interface.eventreadtimeout = nil | |
539 end | |
540 end | |
541 local buffer, err, part = receive( client, pattern ) -- receive buffer with "pattern" | |
542 --vdebug( "read data:", buffer, "error:", err, "part:", part ) | |
543 buffer = buffer or part or "" | |
544 local len = string_len( buffer ) | |
545 if len > cfg.MAX_READ_LENGTH then -- check buffer length | |
546 interface.fatalerror = "receive buffer exceeded" | |
547 debug( "fatal error:", interface.fatalerror ) | |
548 interface:_close() | |
549 interface.eventread = nil | |
550 return -1 | |
551 end | |
552 if err and ( "timeout" ~= err ) then | |
553 if "wantwrite" == err then -- need to read on write event | |
554 if not interface.eventwrite then -- register new write event if needed | |
555 interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) | |
556 end | |
557 interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, | |
558 function( ) | |
559 interface:_close() | |
560 end, cfg.READ_TIMEOUT | |
561 ) | |
562 debug( "wantwrite during read attemp, reg it in writecallback but dont know what really happens next..." ) | |
563 -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... | |
564 else -- connection was closed or fatal error | |
565 interface.fatalerror = err | |
566 debug( "connection failed in read event:", interface.fatalerror ) | |
567 interface:_close() | |
568 interface.eventread = nil | |
569 return -1 | |
570 end | |
571 end | |
572 interface.onincoming( interface, buffer, err ) -- send new data to listener | |
573 return EV_READ, cfg.READ_TIMEOUT | |
574 end | |
575 end | |
576 | |
577 client:settimeout( 0 ) -- set non blocking | |
578 setmetatable(interface, interface_mt) | |
579 interfacelist( "add", interface ) -- add to interfacelist | |
580 return interface | |
581 end | |
582 end | |
583 | |
584 local handleserver | |
585 do | |
586 function handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- creates an server interface | |
587 debug "creating server interface..." | |
588 local interface = { | |
589 _connections = 0; | |
590 | |
591 conn = server; | |
592 onconnect = listener.onconnect; -- will be called when new client connected | |
593 eventread = false; -- read event handler | |
594 eventclose = false; -- close event handler | |
595 readcallback = false; -- read event callback | |
596 fatalerror = false; -- error message | |
597 nointerface = true; -- lock/unlock parameter | |
598 } | |
599 interface.readcallback = function( event ) -- server handler, called on incoming connections | |
600 --vdebug( "server can accept, id/addr/port:", interface, addr, port ) | |
601 if interface.fatalerror then | |
602 --vdebug( "leaving this event because:", self.fatalerror ) | |
603 interface.eventread = nil | |
604 return -1 | |
605 end | |
606 local delay = cfg.ACCEPT_DELAY | |
607 if EV_TIMEOUT == event then | |
608 if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count | |
609 debug( "to many connections, seconds to wait for next accept:", delay ) | |
610 return EV_TIMEOUT, delay -- timeout... | |
611 else | |
612 return EV_READ -- accept again | |
613 end | |
614 end | |
615 --vdebug("max connection check ok, accepting...") | |
616 local client, err = server:accept() -- try to accept; TODO: check err | |
617 while client do | |
618 if interface._connections >= cfg.MAX_CONNECTIONS then | |
619 client:close( ) -- refuse connection | |
620 debug( "maximal connections reached, refuse client connection; accept delay:", delay ) | |
621 return EV_TIMEOUT, delay -- delay for next accept attemp | |
622 end | |
623 local ip, port = client:getpeername( ) | |
624 interface._connections = interface._connections + 1 -- increase connection count | |
625 local clientinterface = handleclient( client, ip, port, interface, pattern, listener, nil, sslctx ) | |
626 --vdebug( "client id:", clientinterface, "startssl:", startssl ) | |
627 if startssl then | |
628 clientinterface:_start_ssl( clientinterface.onconnect ) | |
629 else | |
630 clientinterface:_start_session( clientinterface.onconnect ) | |
631 end | |
632 debug( "accepted incoming client connection from:", ip, port ) | |
633 client, err = server:accept() -- try to accept again | |
634 end | |
635 return EV_READ | |
636 end | |
637 | |
638 server:settimeout( 0 ) | |
639 setmetatable(interface, interface_mt) | |
640 interfacelist( "add", interface ) | |
641 interface:_start_session() | |
642 return interface | |
643 end | |
644 end | |
645 | |
646 local addserver = ( function( ) | |
647 return function( addr, port, listener, pattern, backlog, sslcfg, startssl ) -- TODO: check arguments | |
648 debug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslcfg or "nil", startssl or "nil") | |
649 local server, err = socket.bind( addr, port, backlog ) -- create server socket | |
650 if not server then | |
651 debug( "creating server socket failed because:", err ) | |
652 return nil, err | |
653 end | |
654 local sslctx | |
655 if sslcfg then | |
656 if not ssl then | |
657 debug "fatal error: luasec not found" | |
658 return nil, "luasec not found" | |
659 end | |
660 sslctx, err = ssl.newcontext( sslcfg ) | |
661 if err then | |
662 debug( "error while creating new ssl context for server socket:", err ) | |
663 return nil, err | |
664 end | |
665 end | |
666 local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler | |
667 debug( "new server created with id:", tostring(interface)) | |
668 return interface | |
669 end | |
670 end )( ) | |
671 | |
672 local wrapclient = ( function( ) | |
673 return function( client, addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl ) | |
674 debug( "try to connect to:", addr, serverport, "with parameters:", pattern, localaddr, localport, sslcfg, startssl ) | |
675 local sslctx | |
676 if sslcfg then -- handle ssl/new context | |
677 if not ssl then | |
678 debug "need luasec, but not available" | |
679 return nil, "luasec not found" | |
680 end | |
681 sslctx, err = ssl.newcontext( sslcfg ) | |
682 if err then | |
683 debug( "cannot create new ssl context:", err ) | |
684 return nil, err | |
685 end | |
686 end | |
687 end | |
688 end )( ) | |
689 | |
690 local addclient = ( function( ) | |
691 return function( addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl ) | |
692 local client, err = socket.tcp() -- creating new socket | |
693 if not client then | |
694 debug( "cannot create socket:", err ) | |
695 return nil, err | |
696 end | |
697 client:settimeout( 0 ) -- set nonblocking | |
698 if localaddr then | |
699 local res, err = client:bind( localaddr, localport, -1 ) | |
700 if not res then | |
701 debug( "cannot bind client:", err ) | |
702 return nil, err | |
703 end | |
704 end | |
705 local res, err = client:connect( addr, serverport ) -- connect | |
706 if res or ( err == "timeout" ) then | |
707 local ip, port = client:getsockname( ) | |
708 local server = function( ) | |
709 return nil, "this is a dummy server interface" | |
710 end | |
711 local interface = handleclient( client, ip, port, server, pattern, listener, sslctx ) | |
712 interface:_start_connection( startssl ) | |
713 debug( "new connection id:", interface ) | |
714 return interface, err | |
715 else | |
716 debug( "new connection failed:", err ) | |
717 return nil, err | |
718 end | |
719 return wrapclient( client, addr, serverport, listener, pattern, localaddr, localport, sslcfg, startssl ) | |
720 end | |
721 end )( ) | |
722 | |
723 local loop = function( ) -- starts the event loop | |
724 return base:loop( ) | |
725 end | |
726 | |
727 local newevent = ( function( ) | |
728 local add = base.addevent | |
729 return function( ... ) | |
730 return add( base, ... ) | |
731 end | |
732 end )( ) | |
733 | |
734 local closeallservers = function( arg ) | |
735 for _, item in ipairs( interfacelist( ) ) do | |
736 if item "type" == "server" then | |
737 item( "close", arg ) | |
738 end | |
739 end | |
740 end | |
741 | |
742 return { | |
743 | |
744 cfg = cfg, | |
745 base = base, | |
746 loop = loop, | |
747 event = event, | |
748 addevent = newevent, | |
749 addserver = addserver, | |
750 addclient = addclient, | |
751 wrapclient = wrapclient, | |
752 closeallservers = closeallservers, | |
753 | |
754 __NAME = SCRIPT_NAME, | |
755 __DATE = LAST_MODIFIED, | |
756 __AUTHOR = SCRIPT_AUTHOR, | |
757 __VERSION = SCRIPT_VERSION, | |
758 | |
759 } |