Software /
code /
prosody
Comparison
net/server_event.lua @ 6861:39789f1669cb
Merge 0.10->trunk
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Fri, 25 Sep 2015 18:03:44 +0200 |
parent | 6820:40d50c239564 |
parent | 6860:153a2e49c4f7 |
child | 6862:7a81fb23de5d |
comparison
equal
deleted
inserted
replaced
6847:c314e9142e9d | 6861:39789f1669cb |
---|---|
9 -- to do some of the above, use timeout events or something what will called from outside | 9 -- to do some of the above, use timeout events or something what will called from outside |
10 -- dont let garbagecollect eventcallbacks, as long they are running | 10 -- dont let garbagecollect eventcallbacks, as long they are running |
11 -- when using luasec, there are 4 cases of timeout errors: wantread or wantwrite during reading or writing | 11 -- when using luasec, there are 4 cases of timeout errors: wantread or wantwrite during reading or writing |
12 | 12 |
13 --]] | 13 --]] |
14 -- luacheck: ignore 212/self 431/err 211/ret | |
14 | 15 |
15 local SCRIPT_NAME = "server_event.lua" | 16 local SCRIPT_NAME = "server_event.lua" |
16 local SCRIPT_VERSION = "0.05" | 17 local SCRIPT_VERSION = "0.05" |
17 local SCRIPT_AUTHOR = "blastbeat" | 18 local SCRIPT_AUTHOR = "blastbeat" |
18 local LAST_MODIFIED = "2009/11/20" | 19 local LAST_MODIFIED = "2009/11/20" |
30 CONNECT_TIMEOUT = 20, -- timeout in seconds for connection attempts | 31 CONNECT_TIMEOUT = 20, -- timeout in seconds for connection attempts |
31 CLEAR_DELAY = 5, -- seconds to wait for clearing interface list (and calling ondisconnect listeners) | 32 CLEAR_DELAY = 5, -- seconds to wait for clearing interface list (and calling ondisconnect listeners) |
32 DEBUG = true, -- show debug messages | 33 DEBUG = true, -- show debug messages |
33 } | 34 } |
34 | 35 |
35 local function use(x) return rawget(_G, x); end | 36 local pairs = pairs |
36 local ipairs = use "ipairs" | 37 local select = select |
37 local string = use "string" | 38 local require = require |
38 local select = use "select" | 39 local tostring = tostring |
39 local require = use "require" | 40 local setmetatable = setmetatable |
40 local tostring = use "tostring" | |
41 local coroutine = use "coroutine" | |
42 local setmetatable = use "setmetatable" | |
43 | 41 |
44 local t_insert = table.insert | 42 local t_insert = table.insert |
45 local t_concat = table.concat | 43 local t_concat = table.concat |
44 local s_sub = string.sub | |
45 | |
46 local coroutine_wrap = coroutine.wrap | |
47 local coroutine_yield = coroutine.yield | |
46 | 48 |
47 local has_luasec, ssl = pcall ( require , "ssl" ) | 49 local has_luasec, ssl = pcall ( require , "ssl" ) |
48 local socket = use "socket" or require "socket" | 50 local socket = require "socket" |
51 local levent = require "luaevent.core" | |
52 | |
53 local socket_gettime = socket.gettime | |
49 local getaddrinfo = socket.dns.getaddrinfo | 54 local getaddrinfo = socket.dns.getaddrinfo |
50 | 55 |
51 local log = require ("util.logger").init("socket") | 56 local log = require ("util.logger").init("socket") |
52 | 57 |
53 local function debug(...) | 58 local function debug(...) |
54 return log("debug", ("%s "):rep(select('#', ...)), ...) | 59 return log("debug", ("%s "):rep(select('#', ...)), ...) |
55 end | 60 end |
56 local vdebug = debug; | 61 -- local vdebug = debug; |
57 | 62 |
58 local bitor = ( function( ) -- thx Rici Lake | 63 local bitor = ( function( ) -- thx Rici Lake |
59 local hasbit = function( x, p ) | 64 local hasbit = function( x, p ) |
60 return x % ( p + p ) >= p | 65 return x % ( p + p ) >= p |
61 end | 66 end |
71 end | 76 end |
72 return z | 77 return z |
73 end | 78 end |
74 end )( ) | 79 end )( ) |
75 | 80 |
76 local event = require "luaevent.core" | 81 local base = levent.new( ) |
77 local base = event.new( ) | 82 local addevent = base.addevent |
78 local EV_READ = event.EV_READ | 83 local EV_READ = levent.EV_READ |
79 local EV_WRITE = event.EV_WRITE | 84 local EV_WRITE = levent.EV_WRITE |
80 local EV_TIMEOUT = event.EV_TIMEOUT | 85 local EV_TIMEOUT = levent.EV_TIMEOUT |
81 local EV_SIGNAL = event.EV_SIGNAL | 86 local EV_SIGNAL = levent.EV_SIGNAL |
82 | 87 |
83 local EV_READWRITE = bitor( EV_READ, EV_WRITE ) | 88 local EV_READWRITE = bitor( EV_READ, EV_WRITE ) |
84 | 89 |
85 local interfacelist = ( function( ) -- holds the interfaces for sockets | 90 local interfacelist = { } |
86 local array = { } | 91 |
87 local len = 0 | 92 -- Client interface methods |
88 return function( method, arg ) | 93 local interface_mt = {}; interface_mt.__index = interface_mt; |
89 if "add" == method then | 94 |
90 len = len + 1 | 95 -- Private methods |
91 array[ len ] = arg | 96 function interface_mt:_close() |
92 arg:_position( len ) | 97 return self:_destroy(); |
93 return len | 98 end |
94 elseif "delete" == method then | 99 |
95 if len <= 0 then | 100 function interface_mt:_start_connection(plainssl) -- called from wrapclient |
96 return nil, "array is already empty" | 101 local callback = function( event ) |
97 end | 102 if EV_TIMEOUT == event then -- timeout during connection |
98 local position = arg:_position() -- get position in array | 103 self.fatalerror = "connection timeout" |
99 if position ~= len then | 104 self:ontimeout() -- call timeout listener |
100 local interface = array[ len ] -- get last interface | 105 self:_close() |
101 array[ position ] = interface -- copy it into free position | 106 debug( "new connection failed. id:", self.id, "error:", self.fatalerror ) |
102 array[ len ] = nil -- free last position | |
103 interface:_position( position ) -- set new position in array | |
104 else -- free last position | |
105 array[ len ] = nil | |
106 end | |
107 len = len - 1 | |
108 return len | |
109 else | 107 else |
110 return array | 108 if plainssl and has_luasec then -- start ssl session |
111 end | 109 self:starttls(self._sslctx, true) |
112 end | 110 else -- normal connection |
113 end )( ) | 111 self:_start_session(true) |
114 | 112 end |
115 -- Client interface methods | 113 debug( "new connection established. id:", self.id ) |
116 local interface_mt | 114 end |
117 do | 115 self.eventconnect = nil |
118 interface_mt = {}; interface_mt.__index = interface_mt; | 116 return -1 |
119 | 117 end |
120 local addevent = base.addevent | 118 self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT ) |
121 local coroutine_wrap, coroutine_yield = coroutine.wrap,coroutine.yield | 119 return true |
122 | 120 end |
123 -- Private methods | 121 function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl |
124 function interface_mt:_position(new_position) | 122 if self.type == "client" then |
125 self.position = new_position or self.position | 123 local callback = function( ) |
126 return self.position; | 124 self:_lock( false, false, false ) |
127 end | 125 --vdebug( "start listening on client socket with id:", self.id ) |
128 function interface_mt:_close() | 126 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback |
129 return self:_destroy(); | 127 if call_onconnect then |
130 end | 128 self:onconnect() |
131 | 129 end |
132 function interface_mt:_start_connection(plainssl) -- called from wrapclient | 130 self.eventsession = nil |
133 local callback = function( event ) | 131 return -1 |
134 if EV_TIMEOUT == event then -- timeout during connection | 132 end |
135 self.fatalerror = "connection timeout" | 133 self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 ) |
136 self:ontimeout() -- call timeout listener | 134 else |
137 self:_close() | 135 self:_lock( false ) |
138 debug( "new connection failed. id:", self.id, "error:", self.fatalerror ) | 136 --vdebug( "start listening on server socket with id:", self.id ) |
137 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback | |
138 end | |
139 return true | |
140 end | |
141 function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first | |
142 --vdebug( "starting ssl session with client id:", self.id ) | |
143 local _ | |
144 _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! | |
145 _ = self.eventwrite and self.eventwrite:close( ) | |
146 self.eventread, self.eventwrite = nil, nil | |
147 local err | |
148 self.conn, err = ssl.wrap( self.conn, self._sslctx ) | |
149 if err then | |
150 self.fatalerror = err | |
151 self.conn = nil -- cannot be used anymore | |
152 if call_onconnect then | |
153 self.ondisconnect = nil -- dont call this when client isnt really connected | |
154 end | |
155 self:_close() | |
156 debug( "fatal error while ssl wrapping:", err ) | |
157 return false | |
158 end | |
159 self.conn:settimeout( 0 ) -- set non blocking | |
160 local handshakecallback = coroutine_wrap(function( event ) | |
161 local _, err | |
162 local attempt = 0 | |
163 local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS | |
164 while attempt < maxattempt do -- no endless loop | |
165 attempt = attempt + 1 | |
166 debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt ) | |
167 if attempt > maxattempt then | |
168 self.fatalerror = "max handshake attempts exceeded" | |
169 elseif EV_TIMEOUT == event then | |
170 self.fatalerror = "timeout during handshake" | |
171 else | |
172 _, err = self.conn:dohandshake( ) | |
173 if not err then | |
174 self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed | |
175 self.send = self.conn.send -- caching table lookups with new client object | |
176 self.receive = self.conn.receive | |
177 if not call_onconnect then -- trigger listener | |
178 self:onstatus("ssl-handshake-complete"); | |
179 end | |
180 self:_start_session( call_onconnect ) | |
181 debug( "ssl handshake done" ) | |
182 self.eventhandshake = nil | |
183 return -1 | |
184 end | |
185 if err == "wantwrite" then | |
186 event = EV_WRITE | |
187 elseif err == "wantread" then | |
188 event = EV_READ | |
139 else | 189 else |
140 if plainssl and has_luasec then -- start ssl session | 190 debug( "ssl handshake error:", err ) |
141 self:starttls(self._sslctx, true) | 191 self.fatalerror = err |
142 else -- normal connection | |
143 self:_start_session(true) | |
144 end | |
145 debug( "new connection established. id:", self.id ) | |
146 end | 192 end |
147 self.eventconnect = nil | 193 end |
148 return -1 | 194 if self.fatalerror then |
149 end | |
150 self.eventconnect = addevent( base, self.conn, EV_WRITE, callback, cfg.CONNECT_TIMEOUT ) | |
151 return true | |
152 end | |
153 function interface_mt:_start_session(call_onconnect) -- new session, for example after startssl | |
154 if self.type == "client" then | |
155 local callback = function( ) | |
156 self:_lock( false, false, false ) | |
157 --vdebug( "start listening on client socket with id:", self.id ) | |
158 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback | |
159 if call_onconnect then | |
160 self:onconnect() | |
161 end | |
162 self.eventsession = nil | |
163 return -1 | |
164 end | |
165 self.eventsession = addevent( base, nil, EV_TIMEOUT, callback, 0 ) | |
166 else | |
167 self:_lock( false ) | |
168 --vdebug( "start listening on server socket with id:", self.id ) | |
169 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback ) -- register callback | |
170 end | |
171 return true | |
172 end | |
173 function interface_mt:_start_ssl(call_onconnect) -- old socket will be destroyed, therefore we have to close read/write events first | |
174 --vdebug( "starting ssl session with client id:", self.id ) | |
175 local _ | |
176 _ = self.eventread and self.eventread:close( ) -- close events; this must be called outside of the event callbacks! | |
177 _ = self.eventwrite and self.eventwrite:close( ) | |
178 self.eventread, self.eventwrite = nil, nil | |
179 local err | |
180 self.conn, err = ssl.wrap( self.conn, self._sslctx ) | |
181 if err then | |
182 self.fatalerror = err | |
183 self.conn = nil -- cannot be used anymore | |
184 if call_onconnect then | 195 if call_onconnect then |
185 self.ondisconnect = nil -- dont call this when client isnt really connected | 196 self.ondisconnect = nil -- dont call this when client isnt really connected |
186 end | 197 end |
187 self:_close() | 198 self:_close() |
188 debug( "fatal error while ssl wrapping:", err ) | 199 debug( "handshake failed because:", self.fatalerror ) |
189 return false | 200 self.eventhandshake = nil |
190 end | 201 return -1 |
191 self.conn:settimeout( 0 ) -- set non blocking | 202 end |
192 local handshakecallback = coroutine_wrap( | 203 event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster... |
193 function( event ) | 204 end |
194 local _, err | 205 end |
195 local attempt = 0 | 206 ) |
196 local maxattempt = cfg.MAX_HANDSHAKE_ATTEMPTS | 207 debug "starting handshake..." |
197 while attempt < maxattempt do -- no endless loop | 208 self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked |
198 attempt = attempt + 1 | 209 self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT ) |
199 debug( "ssl handshake of client with id:"..tostring(self)..", attempt:"..attempt ) | 210 return true |
200 if attempt > maxattempt then | 211 end |
201 self.fatalerror = "max handshake attempts exceeded" | 212 function interface_mt:_destroy() -- close this interface + events and call last listener |
202 elseif EV_TIMEOUT == event then | 213 debug( "closing client with id:", self.id, self.fatalerror ) |
203 self.fatalerror = "timeout during handshake" | 214 self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions |
204 else | 215 local _ |
205 _, err = self.conn:dohandshake( ) | 216 _ = self.eventread and self.eventread:close( ) |
206 if not err then | 217 if self.type == "client" then |
207 self:_lock( false, false, false ) -- unlock the interface; sending, closing etc allowed | 218 _ = self.eventwrite and self.eventwrite:close( ) |
208 self.send = self.conn.send -- caching table lookups with new client object | 219 _ = self.eventhandshake and self.eventhandshake:close( ) |
209 self.receive = self.conn.receive | 220 _ = self.eventstarthandshake and self.eventstarthandshake:close( ) |
210 if not call_onconnect then -- trigger listener | 221 _ = self.eventconnect and self.eventconnect:close( ) |
211 self:onstatus("ssl-handshake-complete"); | 222 _ = self.eventsession and self.eventsession:close( ) |
212 end | 223 _ = self.eventwritetimeout and self.eventwritetimeout:close( ) |
213 self:_start_session( call_onconnect ) | 224 _ = self.eventreadtimeout and self.eventreadtimeout:close( ) |
214 debug( "ssl handshake done" ) | 225 _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect) |
215 self.eventhandshake = nil | 226 _ = self.conn and self.conn:close( ) -- close connection |
216 return -1 | 227 _ = self._server and self._server:counter(-1); |
217 end | 228 self.eventread, self.eventwrite = nil, nil |
218 if err == "wantwrite" then | 229 self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil |
219 event = EV_WRITE | 230 self.readcallback, self.writecallback = nil, nil |
220 elseif err == "wantread" then | 231 else |
221 event = EV_READ | 232 self.conn:close( ) |
222 else | 233 self.eventread, self.eventclose = nil, nil |
223 debug( "ssl handshake error:", err ) | 234 self.interface, self.readcallback = nil, nil |
224 self.fatalerror = err | 235 end |
225 end | 236 interfacelist[ self ] = nil |
226 end | 237 return true |
227 if self.fatalerror then | 238 end |
228 if call_onconnect then | 239 |
229 self.ondisconnect = nil -- dont call this when client isnt really connected | 240 function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events |
230 end | 241 self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting |
231 self:_close() | 242 return nointerface, noreading, nowriting |
232 debug( "handshake failed because:", self.fatalerror ) | 243 end |
233 self.eventhandshake = nil | 244 |
234 return -1 | 245 --TODO: Deprecate |
235 end | 246 function interface_mt:lock_read(switch) |
236 event = coroutine_yield( event, cfg.HANDSHAKE_TIMEOUT ) -- yield this monster... | 247 if switch then |
248 return self:pause(); | |
249 else | |
250 return self:resume(); | |
251 end | |
252 end | |
253 | |
254 function interface_mt:pause() | |
255 return self:_lock(self.nointerface, true, self.nowriting); | |
256 end | |
257 | |
258 function interface_mt:resume() | |
259 self:_lock(self.nointerface, false, self.nowriting); | |
260 if not self.eventread then | |
261 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback | |
262 end | |
263 end | |
264 | |
265 function interface_mt:counter(c) | |
266 if c then | |
267 self._connections = self._connections + c | |
268 end | |
269 return self._connections | |
270 end | |
271 | |
272 -- Public methods | |
273 function interface_mt:write(data) | |
274 if self.nowriting then return nil, "locked" end | |
275 --vdebug( "try to send data to client, id/data:", self.id, data ) | |
276 data = tostring( data ) | |
277 local len = #data | |
278 local total = len + self.writebufferlen | |
279 if total > cfg.MAX_SEND_LENGTH then -- check buffer length | |
280 local err = "send buffer exceeded" | |
281 debug( "error:", err ) -- to much, check your app | |
282 return nil, err | |
283 end | |
284 t_insert(self.writebuffer, data) -- new buffer | |
285 self.writebufferlen = total | |
286 if not self.eventwrite then -- register new write event | |
287 --vdebug( "register new write event" ) | |
288 self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ) | |
289 end | |
290 return true | |
291 end | |
292 function interface_mt:close() | |
293 if self.nointerface then return nil, "locked"; end | |
294 debug( "try to close client connection with id:", self.id ) | |
295 if self.type == "client" then | |
296 self.fatalerror = "client to close" | |
297 if self.eventwrite then -- wait for incomplete write request | |
298 self:_lock( true, true, false ) | |
299 debug "closing delayed until writebuffer is empty" | |
300 return nil, "writebuffer not empty, waiting" | |
301 else -- close now | |
302 self:_lock( true, true, true ) | |
303 self:_close() | |
304 return true | |
305 end | |
306 else | |
307 debug( "try to close server with id:", tostring(self.id)) | |
308 self.fatalerror = "server to close" | |
309 self:_lock( true ) | |
310 self:_close( 0 ) | |
311 return true | |
312 end | |
313 end | |
314 | |
315 function interface_mt:socket() | |
316 return self.conn | |
317 end | |
318 | |
319 function interface_mt:server() | |
320 return self._server or self; | |
321 end | |
322 | |
323 function interface_mt:port() | |
324 return self._port | |
325 end | |
326 | |
327 function interface_mt:serverport() | |
328 return self._serverport | |
329 end | |
330 | |
331 function interface_mt:ip() | |
332 return self._ip | |
333 end | |
334 | |
335 function interface_mt:ssl() | |
336 return self._usingssl | |
337 end | |
338 interface_mt.clientport = interface_mt.port -- COMPAT server_select | |
339 | |
340 function interface_mt:type() | |
341 return self._type or "client" | |
342 end | |
343 | |
344 function interface_mt:connections() | |
345 return self._connections | |
346 end | |
347 | |
348 function interface_mt:address() | |
349 return self.addr | |
350 end | |
351 | |
352 function interface_mt:set_sslctx(sslctx) | |
353 self._sslctx = sslctx; | |
354 if sslctx then | |
355 self.starttls = nil; -- use starttls() of interface_mt | |
356 else | |
357 self.starttls = false; -- prevent starttls() | |
358 end | |
359 end | |
360 | |
361 function interface_mt:set_mode(pattern) | |
362 if pattern then | |
363 self._pattern = pattern; | |
364 end | |
365 return self._pattern; | |
366 end | |
367 | |
368 function interface_mt:set_send(new_send) -- luacheck: ignore 212 | |
369 -- No-op, we always use the underlying connection's send | |
370 end | |
371 | |
372 function interface_mt:starttls(sslctx, call_onconnect) | |
373 debug( "try to start ssl at client id:", self.id ) | |
374 local err | |
375 self._sslctx = sslctx; | |
376 if self._usingssl then -- startssl was already called | |
377 err = "ssl already active" | |
378 end | |
379 if err then | |
380 debug( "error:", err ) | |
381 return nil, err | |
382 end | |
383 self._usingssl = true | |
384 self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event | |
385 self.startsslcallback = nil | |
386 self:_start_ssl(call_onconnect); | |
387 self.eventstarthandshake = nil | |
388 return -1 | |
389 end | |
390 if not self.eventwrite then | |
391 self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake | |
392 self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake | |
393 else -- wait until writebuffer is empty | |
394 self:_lock( true, true, false ) | |
395 debug "ssl session delayed until writebuffer is empty..." | |
396 end | |
397 self.starttls = false; | |
398 return true | |
399 end | |
400 | |
401 function interface_mt:setoption(option, value) | |
402 if self.conn.setoption then | |
403 return self.conn:setoption(option, value); | |
404 end | |
405 return false, "setoption not implemented"; | |
406 end | |
407 | |
408 function interface_mt:setlistener(listener) | |
409 self:ondetach(); -- Notify listener that it is no longer responsible for this connection | |
410 self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, | |
411 self.onreadtimeout, self.onstatus, self.ondetach | |
412 = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout, | |
413 listener.onreadtimeout, listener.onstatus, listener.ondetach; | |
414 end | |
415 | |
416 -- Stub handlers | |
417 function interface_mt:onconnect() | |
418 end | |
419 function interface_mt:onincoming() | |
420 end | |
421 function interface_mt:ondisconnect() | |
422 end | |
423 function interface_mt:ontimeout() | |
424 end | |
425 function interface_mt:onreadtimeout() | |
426 self.fatalerror = "timeout during receiving" | |
427 debug( "connection failed:", self.fatalerror ) | |
428 self:_close() | |
429 self.eventread = nil | |
430 end | |
431 function interface_mt:ondrain() | |
432 end | |
433 function interface_mt:ondetach() | |
434 end | |
435 function interface_mt:onstatus() | |
436 end | |
437 | |
438 -- End of client interface methods | |
439 | |
440 local function handleclient( client, ip, port, server, pattern, listener, sslctx ) -- creates an client interface | |
441 --vdebug("creating client interfacce...") | |
442 local interface = { | |
443 type = "client"; | |
444 conn = client; | |
445 currenttime = socket_gettime( ); -- safe the origin | |
446 writebuffer = {}; -- writebuffer | |
447 writebufferlen = 0; -- length of writebuffer | |
448 send = client.send; -- caching table lookups | |
449 receive = client.receive; | |
450 onconnect = listener.onconnect; -- will be called when client disconnects | |
451 ondisconnect = listener.ondisconnect; -- will be called when client disconnects | |
452 onincoming = listener.onincoming; -- will be called when client sends data | |
453 ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs | |
454 onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs | |
455 ondrain = listener.ondrain; -- called when writebuffer is empty | |
456 ondetach = listener.ondetach; -- called when disassociating this listener from this connection | |
457 onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS) | |
458 eventread = false, eventwrite = false, eventclose = false, | |
459 eventhandshake = false, eventstarthandshake = false; -- event handler | |
460 eventconnect = false, eventsession = false; -- more event handler... | |
461 eventwritetimeout = false; -- even more event handler... | |
462 eventreadtimeout = false; | |
463 fatalerror = false; -- error message | |
464 writecallback = false; -- will be called on write events | |
465 readcallback = false; -- will be called on read events | |
466 nointerface = true; -- lock/unlock parameter of this interface | |
467 noreading = false, nowriting = false; -- locks of the read/writecallback | |
468 startsslcallback = false; -- starting handshake callback | |
469 position = false; -- position of client in interfacelist | |
470 | |
471 -- Properties | |
472 _ip = ip, _port = port, _server = server, _pattern = pattern, | |
473 _serverport = (server and server:port() or nil), | |
474 _sslctx = sslctx; -- parameters | |
475 _usingssl = false; -- client is using ssl; | |
476 } | |
477 if not has_luasec then interface.starttls = false; end | |
478 interface.id = tostring(interface):match("%x+$"); | |
479 interface.writecallback = function( event ) -- called on write events | |
480 --vdebug( "new client write event, id/ip/port:", interface, ip, port ) | |
481 if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event | |
482 --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror ) | |
483 interface.eventwrite = false | |
484 return -1 | |
485 end | |
486 if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect | |
487 interface.fatalerror = "timeout during writing" | |
488 debug( "writing failed:", interface.fatalerror ) | |
489 interface:_close() | |
490 interface.eventwrite = false | |
491 return -1 | |
492 else -- can write :) | |
493 if interface._usingssl then -- handle luasec | |
494 if interface.eventreadtimeout then -- we have to read first | |
495 local ret = interface.readcallback( ) -- call readcallback | |
496 --vdebug( "tried to read in writecallback, result:", ret ) | |
497 end | |
498 if interface.eventwritetimeout then -- luasec only | |
499 interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error | |
500 interface.eventwritetimeout = false | |
501 end | |
502 end | |
503 interface.writebuffer = { t_concat(interface.writebuffer) } | |
504 local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen ) | |
505 --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte ) | |
506 if succ then -- writing succesful | |
507 interface.writebuffer[1] = nil | |
508 interface.writebufferlen = 0 | |
509 interface:ondrain(); | |
510 if interface.fatalerror then | |
511 debug "closing client after writing" | |
512 interface:_close() -- close interface if needed | |
513 elseif interface.startsslcallback then -- start ssl connection if needed | |
514 debug "starting ssl handshake after writing" | |
515 interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 ) | |
516 elseif interface.eventreadtimeout then | |
517 return EV_WRITE, EV_TIMEOUT | |
518 end | |
519 interface.eventwrite = nil | |
520 return -1 | |
521 elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again | |
522 --vdebug( "writebuffer is not empty:", err ) | |
523 interface.writebuffer[1] = s_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer | |
524 interface.writebufferlen = interface.writebufferlen - byte | |
525 if "wantread" == err then -- happens only with luasec | |
526 local callback = function( ) | |
527 interface:_close() | |
528 interface.eventwritetimeout = nil | |
529 return -1; | |
237 end | 530 end |
238 end | 531 interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event |
239 ) | 532 debug( "wantread during write attempt, reg it in readcallback but dont know what really happens next..." ) |
240 debug "starting handshake..." | 533 -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent |
241 self:_lock( false, true, true ) -- unlock read/write events, but keep interface locked | |
242 self.eventhandshake = addevent( base, self.conn, EV_READWRITE, handshakecallback, cfg.HANDSHAKE_TIMEOUT ) | |
243 return true | |
244 end | |
245 function interface_mt:_destroy() -- close this interface + events and call last listener | |
246 debug( "closing client with id:", self.id, self.fatalerror ) | |
247 self:_lock( true, true, true ) -- first of all, lock the interface to avoid further actions | |
248 local _ | |
249 _ = self.eventread and self.eventread:close( ) | |
250 if self.type == "client" then | |
251 _ = self.eventwrite and self.eventwrite:close( ) | |
252 _ = self.eventhandshake and self.eventhandshake:close( ) | |
253 _ = self.eventstarthandshake and self.eventstarthandshake:close( ) | |
254 _ = self.eventconnect and self.eventconnect:close( ) | |
255 _ = self.eventsession and self.eventsession:close( ) | |
256 _ = self.eventwritetimeout and self.eventwritetimeout:close( ) | |
257 _ = self.eventreadtimeout and self.eventreadtimeout:close( ) | |
258 _ = self.ondisconnect and self:ondisconnect( self.fatalerror ~= "client to close" and self.fatalerror) -- call ondisconnect listener (wont be the case if handshake failed on connect) | |
259 _ = self.conn and self.conn:close( ) -- close connection | |
260 _ = self._server and self._server:counter(-1); | |
261 self.eventread, self.eventwrite = nil, nil | |
262 self.eventstarthandshake, self.eventhandshake, self.eventclose = nil, nil, nil | |
263 self.readcallback, self.writecallback = nil, nil | |
264 else | |
265 self.conn:close( ) | |
266 self.eventread, self.eventclose = nil, nil | |
267 self.interface, self.readcallback = nil, nil | |
268 end | |
269 interfacelist( "delete", self ) | |
270 return true | |
271 end | |
272 | |
273 function interface_mt:_lock(nointerface, noreading, nowriting) -- lock or unlock this interface or events | |
274 self.nointerface, self.noreading, self.nowriting = nointerface, noreading, nowriting | |
275 return nointerface, noreading, nowriting | |
276 end | |
277 | |
278 --TODO: Deprecate | |
279 function interface_mt:lock_read(switch) | |
280 if switch then | |
281 return self:pause(); | |
282 else | |
283 return self:resume(); | |
284 end | |
285 end | |
286 | |
287 function interface_mt:pause() | |
288 return self:_lock(self.nointerface, true, self.nowriting); | |
289 end | |
290 | |
291 function interface_mt:resume() | |
292 self:_lock(self.nointerface, false, self.nowriting); | |
293 if not self.eventread then | |
294 self.eventread = addevent( base, self.conn, EV_READ, self.readcallback, cfg.READ_TIMEOUT ); -- register callback | |
295 end | |
296 end | |
297 | |
298 function interface_mt:counter(c) | |
299 if c then | |
300 self._connections = self._connections + c | |
301 end | |
302 return self._connections | |
303 end | |
304 | |
305 -- Public methods | |
306 function interface_mt:write(data) | |
307 if self.nowriting then return nil, "locked" end | |
308 --vdebug( "try to send data to client, id/data:", self.id, data ) | |
309 data = tostring( data ) | |
310 local len = #data | |
311 local total = len + self.writebufferlen | |
312 if total > cfg.MAX_SEND_LENGTH then -- check buffer length | |
313 local err = "send buffer exceeded" | |
314 debug( "error:", err ) -- to much, check your app | |
315 return nil, err | |
316 end | |
317 t_insert(self.writebuffer, data) -- new buffer | |
318 self.writebufferlen = total | |
319 if not self.eventwrite then -- register new write event | |
320 --vdebug( "register new write event" ) | |
321 self.eventwrite = addevent( base, self.conn, EV_WRITE, self.writecallback, cfg.WRITE_TIMEOUT ) | |
322 end | |
323 return true | |
324 end | |
325 function interface_mt:close() | |
326 if self.nointerface then return nil, "locked"; end | |
327 debug( "try to close client connection with id:", self.id ) | |
328 if self.type == "client" then | |
329 self.fatalerror = "client to close" | |
330 if self.eventwrite then -- wait for incomplete write request | |
331 self:_lock( true, true, false ) | |
332 debug "closing delayed until writebuffer is empty" | |
333 return nil, "writebuffer not empty, waiting" | |
334 else -- close now | |
335 self:_lock( true, true, true ) | |
336 self:_close() | |
337 return true | |
338 end | |
339 else | |
340 debug( "try to close server with id:", tostring(self.id)) | |
341 self.fatalerror = "server to close" | |
342 self:_lock( true ) | |
343 self:_close( 0 ) | |
344 return true | |
345 end | |
346 end | |
347 | |
348 function interface_mt:socket() | |
349 return self.conn | |
350 end | |
351 | |
352 function interface_mt:server() | |
353 return self._server or self; | |
354 end | |
355 | |
356 function interface_mt:port() | |
357 return self._port | |
358 end | |
359 | |
360 function interface_mt:serverport() | |
361 return self._serverport | |
362 end | |
363 | |
364 function interface_mt:ip() | |
365 return self._ip | |
366 end | |
367 | |
368 function interface_mt:ssl() | |
369 return self._usingssl | |
370 end | |
371 interface_mt.clientport = interface_mt.port -- COMPAT server_select | |
372 | |
373 function interface_mt:type() | |
374 return self._type or "client" | |
375 end | |
376 | |
377 function interface_mt:connections() | |
378 return self._connections | |
379 end | |
380 | |
381 function interface_mt:address() | |
382 return self.addr | |
383 end | |
384 | |
385 function interface_mt:set_sslctx(sslctx) | |
386 self._sslctx = sslctx; | |
387 if sslctx then | |
388 self.starttls = nil; -- use starttls() of interface_mt | |
389 else | |
390 self.starttls = false; -- prevent starttls() | |
391 end | |
392 end | |
393 | |
394 function interface_mt:set_mode(pattern) | |
395 if pattern then | |
396 self._pattern = pattern; | |
397 end | |
398 return self._pattern; | |
399 end | |
400 | |
401 function interface_mt:set_send(new_send) | |
402 -- No-op, we always use the underlying connection's send | |
403 end | |
404 | |
405 function interface_mt:starttls(sslctx, call_onconnect) | |
406 debug( "try to start ssl at client id:", self.id ) | |
407 local err | |
408 self._sslctx = sslctx; | |
409 if self._usingssl then -- startssl was already called | |
410 err = "ssl already active" | |
411 end | |
412 if err then | |
413 debug( "error:", err ) | |
414 return nil, err | |
415 end | |
416 self._usingssl = true | |
417 self.startsslcallback = function( ) -- we have to start the handshake outside of a read/write event | |
418 self.startsslcallback = nil | |
419 self:_start_ssl(call_onconnect); | |
420 self.eventstarthandshake = nil | |
421 return -1 | |
422 end | |
423 if not self.eventwrite then | |
424 self:_lock( true, true, true ) -- lock the interface, to not disturb the handshake | |
425 self.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, self.startsslcallback, 0 ) -- add event to start handshake | |
426 else -- wait until writebuffer is empty | |
427 self:_lock( true, true, false ) | |
428 debug "ssl session delayed until writebuffer is empty..." | |
429 end | |
430 self.starttls = false; | |
431 return true | |
432 end | |
433 | |
434 function interface_mt:setoption(option, value) | |
435 if self.conn.setoption then | |
436 return self.conn:setoption(option, value); | |
437 end | |
438 return false, "setoption not implemented"; | |
439 end | |
440 | |
441 function interface_mt:setlistener(listener) | |
442 self:ondetach(); -- Notify listener that it is no longer responsible for this connection | |
443 self.onconnect, self.ondisconnect, self.onincoming, self.ontimeout, | |
444 self.onreadtimeout, self.onstatus, self.ondetach | |
445 = listener.onconnect, listener.ondisconnect, listener.onincoming, listener.ontimeout, | |
446 listener.onreadtimeout, listener.onstatus, listener.ondetach; | |
447 end | |
448 | |
449 -- Stub handlers | |
450 function interface_mt:onconnect() | |
451 end | |
452 function interface_mt:onincoming() | |
453 end | |
454 function interface_mt:ondisconnect() | |
455 end | |
456 function interface_mt:ontimeout() | |
457 end | |
458 function interface_mt:onreadtimeout() | |
459 self.fatalerror = "timeout during receiving" | |
460 debug( "connection failed:", self.fatalerror ) | |
461 self:_close() | |
462 self.eventread = nil | |
463 end | |
464 function interface_mt:ondrain() | |
465 end | |
466 function interface_mt:ondetach() | |
467 end | |
468 function interface_mt:onstatus() | |
469 end | |
470 end | |
471 | |
472 -- End of client interface methods | |
473 | |
474 local handleclient; | |
475 do | |
476 local string_sub = string.sub -- caching table lookups | |
477 local addevent = base.addevent | |
478 local socket_gettime = socket.gettime | |
479 function handleclient( client, ip, port, server, pattern, listener, sslctx ) -- creates an client interface | |
480 --vdebug("creating client interfacce...") | |
481 local interface = { | |
482 type = "client"; | |
483 conn = client; | |
484 currenttime = socket_gettime( ); -- safe the origin | |
485 writebuffer = {}; -- writebuffer | |
486 writebufferlen = 0; -- length of writebuffer | |
487 send = client.send; -- caching table lookups | |
488 receive = client.receive; | |
489 onconnect = listener.onconnect; -- will be called when client disconnects | |
490 ondisconnect = listener.ondisconnect; -- will be called when client disconnects | |
491 onincoming = listener.onincoming; -- will be called when client sends data | |
492 ontimeout = listener.ontimeout; -- called when fatal socket timeout occurs | |
493 onreadtimeout = listener.onreadtimeout; -- called when socket inactivity timeout occurs | |
494 ondrain = listener.ondrain; -- called when writebuffer is empty | |
495 ondetach = listener.ondetach; -- called when disassociating this listener from this connection | |
496 onstatus = listener.onstatus; -- called for status changes (e.g. of SSL/TLS) | |
497 eventread = false, eventwrite = false, eventclose = false, | |
498 eventhandshake = false, eventstarthandshake = false; -- event handler | |
499 eventconnect = false, eventsession = false; -- more event handler... | |
500 eventwritetimeout = false; -- even more event handler... | |
501 eventreadtimeout = false; | |
502 fatalerror = false; -- error message | |
503 writecallback = false; -- will be called on write events | |
504 readcallback = false; -- will be called on read events | |
505 nointerface = true; -- lock/unlock parameter of this interface | |
506 noreading = false, nowriting = false; -- locks of the read/writecallback | |
507 startsslcallback = false; -- starting handshake callback | |
508 position = false; -- position of client in interfacelist | |
509 | |
510 -- Properties | |
511 _ip = ip, _port = port, _server = server, _pattern = pattern, | |
512 _serverport = (server and server:port() or nil), | |
513 _sslctx = sslctx; -- parameters | |
514 _usingssl = false; -- client is using ssl; | |
515 } | |
516 if not has_luasec then interface.starttls = false; end | |
517 interface.id = tostring(interface):match("%x+$"); | |
518 interface.writecallback = function( event ) -- called on write events | |
519 --vdebug( "new client write event, id/ip/port:", interface, ip, port ) | |
520 if interface.nowriting or ( interface.fatalerror and ( "client to close" ~= interface.fatalerror ) ) then -- leave this event | |
521 --vdebug( "leaving this event because:", interface.nowriting or interface.fatalerror ) | |
522 interface.eventwrite = false | |
523 return -1 | |
524 end | |
525 if EV_TIMEOUT == event then -- took too long to write some data to socket -> disconnect | |
526 interface.fatalerror = "timeout during writing" | |
527 debug( "writing failed:", interface.fatalerror ) | |
528 interface:_close() | |
529 interface.eventwrite = false | |
530 return -1 | |
531 else -- can write :) | |
532 if interface._usingssl then -- handle luasec | |
533 if interface.eventreadtimeout then -- we have to read first | |
534 local ret = interface.readcallback( ) -- call readcallback | |
535 --vdebug( "tried to read in writecallback, result:", ret ) | |
536 end | |
537 if interface.eventwritetimeout then -- luasec only | |
538 interface.eventwritetimeout:close( ) -- first we have to close timeout event which where regged after a wantread error | |
539 interface.eventwritetimeout = false | |
540 end | |
541 end | |
542 interface.writebuffer = { t_concat(interface.writebuffer) } | |
543 local succ, err, byte = interface.conn:send( interface.writebuffer[1], 1, interface.writebufferlen ) | |
544 --vdebug( "write data:", interface.writebuffer, "error:", err, "part:", byte ) | |
545 if succ then -- writing succesful | |
546 interface.writebuffer[1] = nil | |
547 interface.writebufferlen = 0 | |
548 interface:ondrain(); | |
549 if interface.fatalerror then | |
550 debug "closing client after writing" | |
551 interface:_close() -- close interface if needed | |
552 elseif interface.startsslcallback then -- start ssl connection if needed | |
553 debug "starting ssl handshake after writing" | |
554 interface.eventstarthandshake = addevent( base, nil, EV_TIMEOUT, interface.startsslcallback, 0 ) | |
555 elseif interface.eventreadtimeout then | |
556 return EV_WRITE, EV_TIMEOUT | |
557 end | |
558 interface.eventwrite = nil | |
559 return -1 | |
560 elseif byte and (err == "timeout" or err == "wantwrite") then -- want write again | |
561 --vdebug( "writebuffer is not empty:", err ) | |
562 interface.writebuffer[1] = string_sub( interface.writebuffer[1], byte + 1, interface.writebufferlen ) -- new buffer | |
563 interface.writebufferlen = interface.writebufferlen - byte | |
564 if "wantread" == err then -- happens only with luasec | |
565 local callback = function( ) | |
566 interface:_close() | |
567 interface.eventwritetimeout = nil | |
568 return -1; | |
569 end | |
570 interface.eventwritetimeout = addevent( base, nil, EV_TIMEOUT, callback, cfg.WRITE_TIMEOUT ) -- reg a new timeout event | |
571 debug( "wantread during write attempt, reg it in readcallback but dont know what really happens next..." ) | |
572 -- hopefully this works with luasec; its simply not possible to use 2 different write events on a socket in luaevent | |
573 return -1 | |
574 end | |
575 return EV_WRITE, cfg.WRITE_TIMEOUT | |
576 else -- connection was closed during writing or fatal error | |
577 interface.fatalerror = err or "fatal error" | |
578 debug( "connection failed in write event:", interface.fatalerror ) | |
579 interface:_close() | |
580 interface.eventwrite = nil | |
581 return -1 | 534 return -1 |
582 end | 535 end |
583 end | 536 return EV_WRITE, cfg.WRITE_TIMEOUT |
584 end | 537 else -- connection was closed during writing or fatal error |
585 | 538 interface.fatalerror = err or "fatal error" |
586 interface.readcallback = function( event ) -- called on read events | 539 debug( "connection failed in write event:", interface.fatalerror ) |
587 --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) | 540 interface:_close() |
588 if interface.noreading or interface.fatalerror then -- leave this event | 541 interface.eventwrite = nil |
589 --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) ) | |
590 interface.eventread = nil | |
591 return -1 | 542 return -1 |
592 end | 543 end |
593 if EV_TIMEOUT == event and interface:onreadtimeout() ~= true then | 544 end |
594 return -1 -- took too long to get some data from client -> disconnect | 545 end |
595 end | 546 |
596 if interface._usingssl then -- handle luasec | 547 interface.readcallback = function( event ) -- called on read events |
597 if interface.eventwritetimeout then -- ok, in the past writecallback was regged | 548 --vdebug( "new client read event, id/ip/port:", tostring(interface.id), tostring(ip), tostring(port) ) |
598 local ret = interface.writecallback( ) -- call it | 549 if interface.noreading or interface.fatalerror then -- leave this event |
599 --vdebug( "tried to write in readcallback, result:", tostring(ret) ) | 550 --vdebug( "leaving this event because:", tostring(interface.noreading or interface.fatalerror) ) |
551 interface.eventread = nil | |
552 return -1 | |
553 end | |
554 if EV_TIMEOUT == event and interface:onreadtimeout() ~= true then | |
555 return -1 -- took too long to get some data from client -> disconnect | |
556 end | |
557 if interface._usingssl then -- handle luasec | |
558 if interface.eventwritetimeout then -- ok, in the past writecallback was regged | |
559 local ret = interface.writecallback( ) -- call it | |
560 --vdebug( "tried to write in readcallback, result:", tostring(ret) ) | |
561 end | |
562 if interface.eventreadtimeout then | |
563 interface.eventreadtimeout:close( ) | |
564 interface.eventreadtimeout = nil | |
565 end | |
566 end | |
567 local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" | |
568 --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) | |
569 buffer = buffer or part | |
570 if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length | |
571 interface.fatalerror = "receive buffer exceeded" | |
572 debug( "fatal error:", interface.fatalerror ) | |
573 interface:_close() | |
574 interface.eventread = nil | |
575 return -1 | |
576 end | |
577 if err and ( err ~= "timeout" and err ~= "wantread" ) then | |
578 if "wantwrite" == err then -- need to read on write event | |
579 if not interface.eventwrite then -- register new write event if needed | |
580 interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) | |
600 end | 581 end |
601 if interface.eventreadtimeout then | 582 interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, |
602 interface.eventreadtimeout:close( ) | 583 function( ) |
603 interface.eventreadtimeout = nil | 584 interface:_close() |
604 end | 585 end, cfg.READ_TIMEOUT |
605 end | 586 ) |
606 local buffer, err, part = interface.conn:receive( interface._pattern ) -- receive buffer with "pattern" | 587 debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." ) |
607 --vdebug( "read data:", tostring(buffer), "error:", tostring(err), "part:", tostring(part) ) | 588 -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... |
608 buffer = buffer or part | 589 else -- connection was closed or fatal error |
609 if buffer and #buffer > cfg.MAX_READ_LENGTH then -- check buffer length | 590 interface.fatalerror = err |
610 interface.fatalerror = "receive buffer exceeded" | 591 debug( "connection failed in read event:", interface.fatalerror ) |
611 debug( "fatal error:", interface.fatalerror ) | |
612 interface:_close() | 592 interface:_close() |
613 interface.eventread = nil | 593 interface.eventread = nil |
614 return -1 | 594 return -1 |
615 end | 595 end |
616 if err and ( err ~= "timeout" and err ~= "wantread" ) then | 596 else |
617 if "wantwrite" == err then -- need to read on write event | 597 interface.onincoming( interface, buffer, err ) -- send new data to listener |
618 if not interface.eventwrite then -- register new write event if needed | 598 end |
619 interface.eventwrite = addevent( base, interface.conn, EV_WRITE, interface.writecallback, cfg.WRITE_TIMEOUT ) | 599 if interface.noreading then |
620 end | 600 interface.eventread = nil; |
621 interface.eventreadtimeout = addevent( base, nil, EV_TIMEOUT, | 601 return -1; |
622 function( ) | 602 end |
623 interface:_close() | 603 return EV_READ, cfg.READ_TIMEOUT |
624 end, cfg.READ_TIMEOUT | 604 end |
625 ) | 605 |
626 debug( "wantwrite during read attempt, reg it in writecallback but dont know what really happens next..." ) | 606 client:settimeout( 0 ) -- set non blocking |
627 -- to be honest i dont know what happens next, if it is allowed to first read, the write etc... | 607 setmetatable(interface, interface_mt) |
628 else -- connection was closed or fatal error | 608 interfacelist[ interface ] = true -- add to interfacelist |
629 interface.fatalerror = err | 609 return interface |
630 debug( "connection failed in read event:", interface.fatalerror ) | 610 end |
631 interface:_close() | 611 |
632 interface.eventread = nil | 612 local function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface |
633 return -1 | 613 debug "creating server interface..." |
634 end | 614 local interface = { |
615 _connections = 0; | |
616 | |
617 conn = server; | |
618 onconnect = listener.onconnect; -- will be called when new client connected | |
619 eventread = false; -- read event handler | |
620 eventclose = false; -- close event handler | |
621 readcallback = false; -- read event callback | |
622 fatalerror = false; -- error message | |
623 nointerface = true; -- lock/unlock parameter | |
624 | |
625 _ip = addr, _port = port, _pattern = pattern, | |
626 _sslctx = sslctx; | |
627 } | |
628 interface.id = tostring(interface):match("%x+$"); | |
629 interface.readcallback = function( event ) -- server handler, called on incoming connections | |
630 --vdebug( "server can accept, id/addr/port:", interface, addr, port ) | |
631 if interface.fatalerror then | |
632 --vdebug( "leaving this event because:", self.fatalerror ) | |
633 interface.eventread = nil | |
634 return -1 | |
635 end | |
636 local delay = cfg.ACCEPT_DELAY | |
637 if EV_TIMEOUT == event then | |
638 if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count | |
639 debug( "to many connections, seconds to wait for next accept:", delay ) | |
640 return EV_TIMEOUT, delay -- timeout... | |
635 else | 641 else |
636 interface.onincoming( interface, buffer, err ) -- send new data to listener | 642 return EV_READ -- accept again |
637 end | 643 end |
638 if interface.noreading then | 644 end |
639 interface.eventread = nil; | 645 --vdebug("max connection check ok, accepting...") |
640 return -1; | 646 local client, err = server:accept() -- try to accept; TODO: check err |
641 end | 647 while client do |
642 return EV_READ, cfg.READ_TIMEOUT | 648 if interface._connections >= cfg.MAX_CONNECTIONS then |
643 end | 649 client:close( ) -- refuse connection |
644 | 650 debug( "maximal connections reached, refuse client connection; accept delay:", delay ) |
645 client:settimeout( 0 ) -- set non blocking | 651 return EV_TIMEOUT, delay -- delay for next accept attempt |
646 setmetatable(interface, interface_mt) | 652 end |
647 interfacelist( "add", interface ) -- add to interfacelist | 653 local client_ip, client_port = client:getpeername( ) |
648 return interface | 654 interface._connections = interface._connections + 1 -- increase connection count |
649 end | 655 local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx ) |
650 end | 656 --vdebug( "client id:", clientinterface, "startssl:", startssl ) |
651 | 657 if has_luasec and sslctx then |
652 local handleserver | 658 clientinterface:starttls(sslctx, true) |
653 do | 659 else |
654 function handleserver( server, addr, port, pattern, listener, sslctx ) -- creates an server interface | 660 clientinterface:_start_session( true ) |
655 debug "creating server interface..." | 661 end |
656 local interface = { | 662 debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>"); |
657 _connections = 0; | 663 |
658 | 664 client, err = server:accept() -- try to accept again |
659 conn = server; | 665 end |
660 onconnect = listener.onconnect; -- will be called when new client connected | 666 return EV_READ |
661 eventread = false; -- read event handler | 667 end |
662 eventclose = false; -- close event handler | 668 |
663 readcallback = false; -- read event callback | 669 server:settimeout( 0 ) |
664 fatalerror = false; -- error message | 670 setmetatable(interface, interface_mt) |
665 nointerface = true; -- lock/unlock parameter | 671 interfacelist[ interface ] = true |
666 | 672 interface:_start_session() |
667 _ip = addr, _port = port, _pattern = pattern, | 673 return interface |
668 _sslctx = sslctx; | 674 end |
669 } | 675 |
670 interface.id = tostring(interface):match("%x+$"); | 676 local function addserver( addr, port, listener, pattern, sslctx, startssl ) -- TODO: check arguments |
671 interface.readcallback = function( event ) -- server handler, called on incoming connections | 677 --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslctx or "nil", startssl or "nil") |
672 --vdebug( "server can accept, id/addr/port:", interface, addr, port ) | 678 if sslctx and not has_luasec then |
673 if interface.fatalerror then | 679 debug "fatal error: luasec not found" |
674 --vdebug( "leaving this event because:", self.fatalerror ) | 680 return nil, "luasec not found" |
675 interface.eventread = nil | 681 end |
676 return -1 | 682 local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket |
677 end | 683 if not server then |
678 local delay = cfg.ACCEPT_DELAY | 684 debug( "creating server socket on "..addr.." port "..port.." failed:", err ) |
679 if EV_TIMEOUT == event then | 685 return nil, err |
680 if interface._connections >= cfg.MAX_CONNECTIONS then -- check connection count | 686 end |
681 debug( "to many connections, seconds to wait for next accept:", delay ) | 687 local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler |
682 return EV_TIMEOUT, delay -- timeout... | 688 debug( "new server created with id:", tostring(interface)) |
683 else | 689 return interface |
684 return EV_READ -- accept again | 690 end |
685 end | 691 |
686 end | 692 local function wrapclient( client, ip, port, listeners, pattern, sslctx ) |
687 --vdebug("max connection check ok, accepting...") | 693 local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) |
688 local client, err = server:accept() -- try to accept; TODO: check err | 694 interface:_start_connection(sslctx) |
689 while client do | 695 return interface, client |
690 if interface._connections >= cfg.MAX_CONNECTIONS then | 696 --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface |
691 client:close( ) -- refuse connection | 697 end |
692 debug( "maximal connections reached, refuse client connection; accept delay:", delay ) | 698 |
693 return EV_TIMEOUT, delay -- delay for next accept attempt | 699 local function addclient( addr, serverport, listener, pattern, sslctx, typ ) |
694 end | 700 if sslctx and not has_luasec then |
695 local client_ip, client_port = client:getpeername( ) | 701 debug "need luasec, but not available" |
696 interface._connections = interface._connections + 1 -- increase connection count | 702 return nil, "luasec not found" |
697 local clientinterface = handleclient( client, client_ip, client_port, interface, pattern, listener, sslctx ) | 703 end |
698 --vdebug( "client id:", clientinterface, "startssl:", startssl ) | 704 if getaddrinfo and not typ then |
699 if has_luasec and sslctx then | 705 local addrinfo, err = getaddrinfo(addr) |
700 clientinterface:starttls(sslctx, true) | 706 if not addrinfo then return nil, err end |
701 else | 707 if addrinfo[1] and addrinfo[1].family == "inet6" then |
702 clientinterface:_start_session( true ) | 708 typ = "tcp6" |
703 end | 709 end |
704 debug( "accepted incoming client connection from:", client_ip or "<unknown IP>", client_port or "<unknown port>", "to", port or "<unknown port>"); | 710 end |
705 | 711 local create = socket[typ or "tcp"] |
706 client, err = server:accept() -- try to accept again | 712 if type( create ) ~= "function" then |
707 end | 713 return nil, "invalid socket type" |
708 return EV_READ | 714 end |
709 end | 715 local client, err = create() -- creating new socket |
710 | 716 if not client then |
711 server:settimeout( 0 ) | 717 debug( "cannot create socket:", err ) |
712 setmetatable(interface, interface_mt) | 718 return nil, err |
713 interfacelist( "add", interface ) | 719 end |
714 interface:_start_session() | 720 client:settimeout( 0 ) -- set nonblocking |
715 return interface | 721 local res, err = client:connect( addr, serverport ) -- connect |
716 end | 722 if res or ( err == "timeout" or err == "Operation already in progress" ) then |
717 end | 723 if client.getsockname then |
718 | 724 addr = client:getsockname( ) |
719 local addserver = ( function( ) | 725 end |
720 return function( addr, port, listener, pattern, sslctx, startssl ) -- TODO: check arguments | 726 local interface = wrapclient( client, addr, serverport, listener, pattern, sslctx ) |
721 --vdebug( "creating new tcp server with following parameters:", addr or "nil", port or "nil", sslctx or "nil", startssl or "nil") | 727 debug( "new connection id:", interface.id ) |
722 if sslctx and not has_luasec then | 728 return interface, err |
723 debug "fatal error: luasec not found" | 729 else |
724 return nil, "luasec not found" | 730 debug( "new connection failed:", err ) |
725 end | 731 return nil, err |
726 local server, err = socket.bind( addr, port, cfg.ACCEPT_QUEUE ) -- create server socket | 732 end |
727 if not server then | 733 end |
728 debug( "creating server socket on "..addr.." port "..port.." failed:", err ) | 734 |
729 return nil, err | 735 local function loop( ) -- starts the event loop |
730 end | |
731 local interface = handleserver( server, addr, port, pattern, listener, sslctx, startssl ) -- new server handler | |
732 debug( "new server created with id:", tostring(interface)) | |
733 return interface | |
734 end | |
735 end )( ) | |
736 | |
737 local addclient, wrapclient | |
738 do | |
739 function wrapclient( client, ip, port, listeners, pattern, sslctx ) | |
740 local interface = handleclient( client, ip, port, nil, pattern, listeners, sslctx ) | |
741 interface:_start_connection(sslctx) | |
742 return interface, client | |
743 --function handleclient( client, ip, port, server, pattern, listener, _, sslctx ) -- creates an client interface | |
744 end | |
745 | |
746 function addclient( addr, serverport, listener, pattern, sslctx, typ ) | |
747 if sslctx and not has_luasec then | |
748 debug "need luasec, but not available" | |
749 return nil, "luasec not found" | |
750 end | |
751 if getaddrinfo and not typ then | |
752 local addrinfo, err = getaddrinfo(addr) | |
753 if not addrinfo then return nil, err end | |
754 if addrinfo[1] and addrinfo[1].family == "inet6" then | |
755 typ = "tcp6" | |
756 end | |
757 end | |
758 local create = socket[typ or "tcp"] | |
759 if type( create ) ~= "function" then | |
760 return nil, "invalid socket type" | |
761 end | |
762 local client, err = create() -- creating new socket | |
763 if not client then | |
764 debug( "cannot create socket:", err ) | |
765 return nil, err | |
766 end | |
767 client:settimeout( 0 ) -- set nonblocking | |
768 local res, err = client:connect( addr, serverport ) -- connect | |
769 if res or ( err == "timeout" or err == "Operation already in progress" ) then | |
770 if client.getsockname then | |
771 addr = client:getsockname( ) | |
772 end | |
773 local interface = wrapclient( client, addr, serverport, listener, pattern, sslctx ) | |
774 debug( "new connection id:", interface.id ) | |
775 return interface, err | |
776 else | |
777 debug( "new connection failed:", err ) | |
778 return nil, err | |
779 end | |
780 end | |
781 end | |
782 | |
783 | |
784 local loop = function( ) -- starts the event loop | |
785 base:loop( ) | 736 base:loop( ) |
786 return "quitting"; | 737 return "quitting"; |
787 end | 738 end |
788 | 739 |
789 local newevent = ( function( ) | 740 local function newevent( ... ) |
790 local add = base.addevent | 741 return addevent( base, ... ) |
791 return function( ... ) | 742 end |
792 return add( base, ... ) | 743 |
793 end | 744 local function closeallservers ( arg ) |
794 end )( ) | 745 for item in pairs( interfacelist ) do |
795 | |
796 local closeallservers = function( arg ) | |
797 for _, item in ipairs( interfacelist( ) ) do | |
798 if item.type == "server" then | 746 if item.type == "server" then |
799 item:close( arg ) | 747 item:close( arg ) |
800 end | 748 end |
801 end | 749 end |
802 end | 750 end |
815 | 763 |
816 -- We need to hold onto the events to stop them | 764 -- We need to hold onto the events to stop them |
817 -- being garbage-collected | 765 -- being garbage-collected |
818 local signal_events = {}; -- [signal_num] -> event object | 766 local signal_events = {}; -- [signal_num] -> event object |
819 local function hook_signal(signal_num, handler) | 767 local function hook_signal(signal_num, handler) |
820 local function _handler(event) | 768 local function _handler() |
821 local ret = handler(); | 769 local ret = handler(); |
822 if ret ~= false then -- Continue handling this signal? | 770 if ret ~= false then -- Continue handling this signal? |
823 return EV_SIGNAL; -- Yes | 771 return EV_SIGNAL; -- Yes |
824 end | 772 end |
825 return -1; -- Close this event | 773 return -1; -- Close this event |
864 , delay); | 812 , delay); |
865 end | 813 end |
866 end | 814 end |
867 | 815 |
868 return { | 816 return { |
869 | |
870 cfg = cfg, | 817 cfg = cfg, |
871 base = base, | 818 base = base, |
872 loop = loop, | 819 loop = loop, |
873 link = link, | 820 link = link, |
874 event = event, | 821 event = levent, |
875 event_base = base, | 822 event_base = base, |
876 addevent = newevent, | 823 addevent = newevent, |
877 addserver = addserver, | 824 addserver = addserver, |
878 addclient = addclient, | 825 addclient = addclient, |
879 wrapclient = wrapclient, | 826 wrapclient = wrapclient, |