Comparison

util/adminstream.lua @ 11200:bf8f2da84007

Merge 0.11->trunk
author Kim Alvefur <zash@zash.se>
date Thu, 05 Nov 2020 22:31:25 +0100
parent 10940:18e4e446a76c
child 12094:84fd6a79cda7
comparison
equal deleted inserted replaced
11199:6c7c50a4de32 11200:bf8f2da84007
1 local st = require "util.stanza";
2 local new_xmpp_stream = require "util.xmppstream".new;
3 local sessionlib = require "util.session";
4 local gettime = require "util.time".now;
5 local runner = require "util.async".runner;
6 local add_task = require "util.timer".add_task;
7 local events = require "util.events";
8 local server = require "net.server";
9
10 local stream_close_timeout = 5;
11
12 local log = require "util.logger".init("adminstream");
13
14 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
15
16 local stream_callbacks = { default_ns = "xmpp:prosody.im/admin" };
17
18 function stream_callbacks.streamopened(session, attr)
19 -- run _streamopened in async context
20 session.thread:run({ stream = "opened", attr = attr });
21 end
22
23 function stream_callbacks._streamopened(session, attr) --luacheck: ignore 212/attr
24 if session.type ~= "client" then
25 session:open_stream();
26 end
27 session.notopen = nil;
28 end
29
30 function stream_callbacks.streamclosed(session, attr)
31 -- run _streamclosed in async context
32 session.thread:run({ stream = "closed", attr = attr });
33 end
34
35 function stream_callbacks._streamclosed(session)
36 session.log("debug", "Received </stream:stream>");
37 session:close(false);
38 end
39
40 function stream_callbacks.error(session, error, data)
41 if error == "no-stream" then
42 session.log("debug", "Invalid opening stream header (%s)", (data:gsub("^([^\1]+)\1", "{%1}")));
43 session:close("invalid-namespace");
44 elseif error == "parse-error" then
45 session.log("debug", "Client XML parse error: %s", data);
46 session:close("not-well-formed");
47 elseif error == "stream-error" then
48 local condition, text = "undefined-condition";
49 for child in data:childtags(nil, xmlns_xmpp_streams) do
50 if child.name ~= "text" then
51 condition = child.name;
52 else
53 text = child:get_text();
54 end
55 if condition ~= "undefined-condition" and text then
56 break;
57 end
58 end
59 text = condition .. (text and (" ("..text..")") or "");
60 session.log("info", "Session closed by remote with error: %s", text);
61 session:close(nil, text);
62 end
63 end
64
65 function stream_callbacks.handlestanza(session, stanza)
66 session.thread:run(stanza);
67 end
68
69 local runner_callbacks = {};
70
71 function runner_callbacks:error(err)
72 self.data.log("error", "Traceback[c2s]: %s", err);
73 end
74
75 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
76
77 local function destroy_session(session, reason)
78 if session.destroyed then return; end
79 session.destroyed = true;
80 session.log("debug", "Destroying session: %s", reason or "unknown reason");
81 end
82
83 local function session_close(session, reason)
84 local log = session.log or log;
85 if session.conn then
86 if session.notopen then
87 session:open_stream();
88 end
89 if reason then -- nil == no err, initiated by us, false == initiated by client
90 local stream_error = st.stanza("stream:error");
91 if type(reason) == "string" then -- assume stream error
92 stream_error:tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' });
93 elseif type(reason) == "table" then
94 if reason.condition then
95 stream_error:tag(reason.condition, stream_xmlns_attr):up();
96 if reason.text then
97 stream_error:tag("text", stream_xmlns_attr):text(reason.text):up();
98 end
99 if reason.extra then
100 stream_error:add_child(reason.extra);
101 end
102 elseif reason.name then -- a stanza
103 stream_error = reason;
104 end
105 end
106 stream_error = tostring(stream_error);
107 log("debug", "Disconnecting client, <stream:error> is: %s", stream_error);
108 session.send(stream_error);
109 end
110
111 session.send("</stream:stream>");
112 function session.send() return false; end
113
114 local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason;
115 session.log("debug", "c2s stream for %s closed: %s", session.full_jid or session.ip or "<unknown>", reason_text or "session closed");
116
117 -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
118 local conn = session.conn;
119 if reason_text == nil and not session.notopen and session.type == "c2s" then
120 -- Grace time to process data from authenticated cleanly-closed stream
121 add_task(stream_close_timeout, function ()
122 if not session.destroyed then
123 session.log("warn", "Failed to receive a stream close response, closing connection anyway...");
124 destroy_session(session);
125 conn:close();
126 end
127 end);
128 else
129 destroy_session(session, reason_text);
130 conn:close();
131 end
132 else
133 local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason;
134 destroy_session(session, reason_text);
135 end
136 end
137
138 --- Public methods
139
140 local function new_connection(socket_path, listeners)
141 local have_unix, unix = pcall(require, "socket.unix");
142 if type(unix) ~= "table" then
143 have_unix = false;
144 end
145 local conn, sock;
146
147 return {
148 connect = function ()
149 if not have_unix then
150 return nil, "no unix socket support";
151 end
152 if sock or conn then
153 return nil, "already connected";
154 end
155 sock = unix.stream();
156 sock:settimeout(0);
157 local ok, err = sock:connect(socket_path);
158 if not ok then
159 return nil, err;
160 end
161 conn = server.wrapclient(sock, nil, nil, listeners, "*a");
162 return true;
163 end;
164 disconnect = function ()
165 if conn then
166 conn:close();
167 conn = nil;
168 end
169 if sock then
170 sock:close();
171 sock = nil;
172 end
173 return true;
174 end;
175 };
176 end
177
178 local function new_server(sessions, stanza_handler)
179 local listeners = {};
180
181 function listeners.onconnect(conn)
182 log("debug", "New connection");
183 local session = sessionlib.new("admin");
184 sessionlib.set_id(session);
185 sessionlib.set_logger(session);
186 sessionlib.set_conn(session, conn);
187
188 session.conntime = gettime();
189 session.type = "admin";
190
191 local stream = new_xmpp_stream(session, stream_callbacks);
192 session.stream = stream;
193 session.notopen = true;
194
195 session.thread = runner(function (stanza)
196 if st.is_stanza(stanza) then
197 stanza_handler(session, stanza);
198 elseif stanza.stream == "opened" then
199 stream_callbacks._streamopened(session, stanza.attr);
200 elseif stanza.stream == "closed" then
201 stream_callbacks._streamclosed(session, stanza.attr);
202 end
203 end, runner_callbacks, session);
204
205 function session.data(data)
206 -- Parse the data, which will store stanzas in session.pending_stanzas
207 if data then
208 local ok, err = stream:feed(data);
209 if not ok then
210 session.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300));
211 session:close("not-well-formed");
212 end
213 end
214 end
215
216 session.close = session_close;
217
218 session.send = function (t)
219 session.log("debug", "Sending[%s]: %s", session.type, t.top_tag and t:top_tag() or t:match("^[^>]*>?"));
220 return session.rawsend(tostring(t));
221 end
222
223 function session.rawsend(t)
224 local ret, err = conn:write(t);
225 if not ret then
226 session.log("debug", "Error writing to connection: %s", err);
227 return false, err;
228 end
229 return true;
230 end
231
232 sessions[conn] = session;
233 end
234
235 function listeners.onincoming(conn, data)
236 local session = sessions[conn];
237 if session then
238 session.data(data);
239 end
240 end
241
242 function listeners.ondisconnect(conn, err)
243 local session = sessions[conn];
244 if session then
245 session.log("info", "Admin client disconnected: %s", err or "connection closed");
246 session.conn = nil;
247 sessions[conn] = nil;
248 end
249 end
250
251 function listeners.onreadtimeout(conn)
252 return conn:send(" ");
253 end
254
255 return {
256 listeners = listeners;
257 };
258 end
259
260 local function new_client()
261 local client = {
262 type = "client";
263 events = events.new();
264 log = log;
265 };
266
267 local listeners = {};
268
269 function listeners.onconnect(conn)
270 log("debug", "Connected");
271 client.conn = conn;
272
273 local stream = new_xmpp_stream(client, stream_callbacks);
274 client.stream = stream;
275 client.notopen = true;
276
277 client.thread = runner(function (stanza)
278 if st.is_stanza(stanza) then
279 if not client.events.fire_event("received", stanza) and not stanza.attr.xmlns then
280 client.events.fire_event("received/"..stanza.name, stanza);
281 end
282 elseif stanza.stream == "opened" then
283 stream_callbacks._streamopened(client, stanza.attr);
284 client.events.fire_event("connected");
285 elseif stanza.stream == "closed" then
286 client.events.fire_event("disconnected");
287 stream_callbacks._streamclosed(client, stanza.attr);
288 end
289 end, runner_callbacks, client);
290
291 client.close = session_close;
292
293 function client.send(t)
294 client.log("debug", "Sending: %s", t.top_tag and t:top_tag() or t:match("^[^>]*>?"));
295 return client.rawsend(tostring(t));
296 end
297
298 function client.rawsend(t)
299 local ret, err = conn:write(t);
300 if not ret then
301 client.log("debug", "Error writing to connection: %s", err);
302 return false, err;
303 end
304 return true;
305 end
306 client.log("debug", "Opening stream...");
307 client:open_stream();
308 end
309
310 function listeners.onincoming(conn, data) --luacheck: ignore 212/conn
311 local ok, err = client.stream:feed(data);
312 if not ok then
313 client.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300));
314 client:close("not-well-formed");
315 end
316 end
317
318 function listeners.ondisconnect(conn, err) --luacheck: ignore 212/conn
319 client.log("info", "Admin client disconnected: %s", err or "connection closed");
320 client.conn = nil;
321 end
322
323 function listeners.onreadtimeout(conn)
324 conn:send(" ");
325 end
326
327 client.listeners = listeners;
328
329 return client;
330 end
331
332 return {
333 connection = new_connection;
334 server = new_server;
335 client = new_client;
336 };