Comparison

util/adminstream.lua @ 10855:70ac7d23673d

mod_admin_socket, util.adminstream: New module to manage a local unix domain socket for admin functionality
author Matthew Wild <mwild1@gmail.com>
date Mon, 01 Jun 2020 15:42:19 +0100
child 10875:09674bbb833f
comparison
equal deleted inserted replaced
10854:472fe13a05f9 10855:70ac7d23673d
1 local st = require "util.stanza";
2 local new_xmpp_stream = require "util.xmppstream".new;
3 local sessionlib = require "util.session";
4 local gettime = require "util.time".now;
5 local runner = require "util.async".runner;
6 local add_task = require "util.timer".add_task;
7 local events = require "util.events";
8
9 local stream_close_timeout = 5;
10
11 local log = require "util.logger".init("adminstream");
12
13 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
14
15 local stream_callbacks = { default_ns = "xmpp:prosody.im/admin" };
16
17 function stream_callbacks.streamopened(session, attr)
18 -- run _streamopened in async context
19 session.thread:run({ stream = "opened", attr = attr });
20 end
21
22 function stream_callbacks._streamopened(session, attr) --luacheck: ignore 212/attr
23 if session.type ~= "client" then
24 session:open_stream();
25 end
26 session.notopen = nil;
27 end
28
29 function stream_callbacks.streamclosed(session, attr)
30 -- run _streamclosed in async context
31 session.thread:run({ stream = "closed", attr = attr });
32 end
33
34 function stream_callbacks._streamclosed(session)
35 session.log("debug", "Received </stream:stream>");
36 session:close(false);
37 end
38
39 function stream_callbacks.error(session, error, data)
40 if error == "no-stream" then
41 session.log("debug", "Invalid opening stream header (%s)", (data:gsub("^([^\1]+)\1", "{%1}")));
42 session:close("invalid-namespace");
43 elseif error == "parse-error" then
44 session.log("debug", "Client XML parse error: %s", data);
45 session:close("not-well-formed");
46 elseif error == "stream-error" then
47 local condition, text = "undefined-condition";
48 for child in data:childtags(nil, xmlns_xmpp_streams) do
49 if child.name ~= "text" then
50 condition = child.name;
51 else
52 text = child:get_text();
53 end
54 if condition ~= "undefined-condition" and text then
55 break;
56 end
57 end
58 text = condition .. (text and (" ("..text..")") or "");
59 session.log("info", "Session closed by remote with error: %s", text);
60 session:close(nil, text);
61 end
62 end
63
64 function stream_callbacks.handlestanza(session, stanza)
65 session.thread:run(stanza);
66 end
67
68 local runner_callbacks = {};
69
70 function runner_callbacks:error(err)
71 self.data.log("error", "Traceback[c2s]: %s", err);
72 end
73
74 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
75
76 local function destroy_session(session, reason)
77 if session.destroyed then return; end
78 session.destroyed = true;
79 session.log("debug", "Destroying session: %s", reason or "unknown reason");
80 end
81
82 local function session_close(session, reason)
83 local log = session.log or log;
84 if session.conn then
85 if session.notopen then
86 session:open_stream();
87 end
88 if reason then -- nil == no err, initiated by us, false == initiated by client
89 local stream_error = st.stanza("stream:error");
90 if type(reason) == "string" then -- assume stream error
91 stream_error:tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' });
92 elseif type(reason) == "table" then
93 if reason.condition then
94 stream_error:tag(reason.condition, stream_xmlns_attr):up();
95 if reason.text then
96 stream_error:tag("text", stream_xmlns_attr):text(reason.text):up();
97 end
98 if reason.extra then
99 stream_error:add_child(reason.extra);
100 end
101 elseif reason.name then -- a stanza
102 stream_error = reason;
103 end
104 end
105 stream_error = tostring(stream_error);
106 log("debug", "Disconnecting client, <stream:error> is: %s", stream_error);
107 session.send(stream_error);
108 end
109
110 session.send("</stream:stream>");
111 function session.send() return false; end
112
113 local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason;
114 session.log("debug", "c2s stream for %s closed: %s", session.full_jid or session.ip or "<unknown>", reason_text or "session closed");
115
116 -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
117 local conn = session.conn;
118 if reason_text == nil and not session.notopen and session.type == "c2s" then
119 -- Grace time to process data from authenticated cleanly-closed stream
120 add_task(stream_close_timeout, function ()
121 if not session.destroyed then
122 session.log("warn", "Failed to receive a stream close response, closing connection anyway...");
123 destroy_session(session);
124 conn:close();
125 end
126 end);
127 else
128 destroy_session(session, reason_text);
129 conn:close();
130 end
131 else
132 local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason;
133 destroy_session(session, reason_text);
134 end
135 end
136
137 --- Public methods
138
139 local function new_server(sessions, stanza_handler)
140 local listeners = {};
141
142 function listeners.onconnect(conn)
143 log("debug", "New connection");
144 local session = sessionlib.new("admin");
145 sessionlib.set_id(session);
146 sessionlib.set_logger(session);
147 sessionlib.set_conn(session, conn);
148
149 session.conntime = gettime();
150 session.type = "admin";
151
152 local stream = new_xmpp_stream(session, stream_callbacks);
153 session.stream = stream;
154 session.notopen = true;
155
156 session.thread = runner(function (stanza)
157 if st.is_stanza(stanza) then
158 stanza_handler(session, stanza);
159 elseif stanza.stream == "opened" then
160 stream_callbacks._streamopened(session, stanza.attr);
161 elseif stanza.stream == "closed" then
162 stream_callbacks._streamclosed(session, stanza.attr);
163 end
164 end, runner_callbacks, session);
165
166 function session.data(data)
167 -- Parse the data, which will store stanzas in session.pending_stanzas
168 if data then
169 local ok, err = stream:feed(data);
170 if not ok then
171 session.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300));
172 session:close("not-well-formed");
173 end
174 end
175 end
176
177 session.close = session_close;
178
179 session.send = function (t)
180 session.log("debug", "Sending[%s]: %s", session.type, t.top_tag and t:top_tag() or t:match("^[^>]*>?"));
181 return session.rawsend(tostring(t));
182 end
183
184 function session.rawsend(t)
185 local ret, err = conn:write(t);
186 if not ret then
187 session.log("debug", "Error writing to connection: %s", err);
188 return false, err;
189 end
190 return true;
191 end
192
193 sessions[conn] = session;
194 end
195
196 function listeners.onincoming(conn, data)
197 local session = sessions[conn];
198 if session then
199 session.data(data);
200 end
201 end
202
203 function listeners.ondisconnect(conn, err)
204 local session = sessions[conn];
205 if session then
206 session.log("info", "Admin client disconnected: %s", err or "connection closed");
207 session.conn = nil;
208 sessions[conn] = nil;
209 end
210 end
211 return {
212 listeners = listeners;
213 };
214 end
215
216 local function new_client()
217 local client = {
218 type = "client";
219 events = events.new();
220 log = log;
221 };
222
223 local listeners = {};
224
225 function listeners.onconnect(conn)
226 log("debug", "Connected");
227 client.conn = conn;
228
229 local stream = new_xmpp_stream(client, stream_callbacks);
230 client.stream = stream;
231 client.notopen = true;
232
233 client.thread = runner(function (stanza)
234 if st.is_stanza(stanza) then
235 client.events.fire_event("received", stanza);
236 elseif stanza.stream == "opened" then
237 stream_callbacks._streamopened(client, stanza.attr);
238 client.events.fire_event("connected");
239 elseif stanza.stream == "closed" then
240 client.events.fire_event("disconnected");
241 stream_callbacks._streamclosed(client, stanza.attr);
242 end
243 end, runner_callbacks, client);
244
245 client.close = session_close;
246
247 function client.send(t)
248 client.log("debug", "Sending: %s", t.top_tag and t:top_tag() or t:match("^[^>]*>?"));
249 return client.rawsend(tostring(t));
250 end
251
252 function client.rawsend(t)
253 local ret, err = conn:write(t);
254 if not ret then
255 client.log("debug", "Error writing to connection: %s", err);
256 return false, err;
257 end
258 return true;
259 end
260 client.log("debug", "Opening stream...");
261 client:open_stream();
262 end
263
264 function listeners.onincoming(conn, data) --luacheck: ignore 212/conn
265 local ok, err = client.stream:feed(data);
266 if not ok then
267 client.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300));
268 client:close("not-well-formed");
269 end
270 end
271
272 function listeners.ondisconnect(conn, err) --luacheck: ignore 212/conn
273 client.log("info", "Admin client disconnected: %s", err or "connection closed");
274 client.conn = nil;
275 end
276
277 client.listeners = listeners;
278
279 return client;
280 end
281
282 return {
283 server = new_server;
284 client = new_client;
285 };