Comparison

plugins/mod_c2s.lua @ 7284:9434a220e549

mod_c2s: Remove use of util.async
author Kim Alvefur <zash@zash.se>
date Thu, 17 Mar 2016 12:47:24 +0100
parent 7224:07a4c807a94a
child 7286:30b9433a9f3e
child 7329:ab811c1bb730
comparison
equal deleted inserted replaced
7283:32754b15b28a 7284:9434a220e549
13 local nameprep = require "util.encodings".stringprep.nameprep; 13 local nameprep = require "util.encodings".stringprep.nameprep;
14 local sessionmanager = require "core.sessionmanager"; 14 local sessionmanager = require "core.sessionmanager";
15 local st = require "util.stanza"; 15 local st = require "util.stanza";
16 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; 16 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session;
17 local uuid_generate = require "util.uuid".generate; 17 local uuid_generate = require "util.uuid".generate;
18 local runner = require "util.async".runner;
19 18
20 local xpcall, tostring, type = xpcall, tostring, type; 19 local xpcall, tostring, type = xpcall, tostring, type;
21 local t_insert, t_remove = table.insert, table.remove; 20 local traceback = debug.traceback;
22 21
23 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; 22 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
24 23
25 local log = module._log; 24 local log = module._log;
26 25
34 local core_process_stanza = prosody.core_process_stanza; 33 local core_process_stanza = prosody.core_process_stanza;
35 local hosts = prosody.hosts; 34 local hosts = prosody.hosts;
36 35
37 local stream_callbacks = { default_ns = "jabber:client" }; 36 local stream_callbacks = { default_ns = "jabber:client" };
38 local listener = {}; 37 local listener = {};
39 local runner_callbacks = {};
40 38
41 --- Stream events handlers 39 --- Stream events handlers
42 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; 40 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
43 41
44 function stream_callbacks.streamopened(session, attr) 42 function stream_callbacks.streamopened(session, attr)
121 session.log("info", "Session closed by remote with error: %s", text); 119 session.log("info", "Session closed by remote with error: %s", text);
122 session:close(nil, text); 120 session:close(nil, text);
123 end 121 end
124 end 122 end
125 123
124 local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end
126 function stream_callbacks.handlestanza(session, stanza) 125 function stream_callbacks.handlestanza(session, stanza)
127 stanza = session.filter("stanzas/in", stanza); 126 stanza = session.filter("stanzas/in", stanza);
128 session.thread:run(stanza); 127 if stanza then
128 return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
129 end
129 end 130 end
130 131
131 --- Session methods 132 --- Session methods
132 local function session_close(session, reason) 133 local function session_close(session, reason)
133 local log = session.log or log; 134 local log = session.log or log;
189 session:close{ condition = "not-authorized", text = "Account deleted" }; 190 session:close{ condition = "not-authorized", text = "Account deleted" };
190 end 191 end
191 end 192 end
192 end, 200); 193 end, 200);
193 194
194 function runner_callbacks:ready()
195 self.data.conn:resume();
196 end
197
198 function runner_callbacks:waiting()
199 self.data.conn:pause();
200 end
201
202 function runner_callbacks:error(err)
203 (self.data.log or log)("error", "Traceback[c2s]: %s", err);
204 end
205
206 --- Port listener 195 --- Port listener
207 function listener.onconnect(conn) 196 function listener.onconnect(conn)
208 measure_connections(1); 197 measure_connections(1);
209 local session = sm_new_session(conn); 198 local session = sm_new_session(conn);
210 sessions[conn] = session; 199 sessions[conn] = session;
237 226
238 function session.reset_stream() 227 function session.reset_stream()
239 session.notopen = true; 228 session.notopen = true;
240 session.stream:reset(); 229 session.stream:reset();
241 end 230 end
242
243 session.thread = runner(function (stanza)
244 core_process_stanza(session, stanza);
245 end, runner_callbacks, session);
246 231
247 local filter = session.filter; 232 local filter = session.filter;
248 function session.data(data) 233 function session.data(data)
249 -- Parse the data, which will store stanzas in session.pending_stanzas 234 -- Parse the data, which will store stanzas in session.pending_stanzas
250 if data then 235 if data then