Software /
code /
prosody
Comparison
util/adminstream.lua @ 11200:bf8f2da84007
Merge 0.11->trunk
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Thu, 05 Nov 2020 22:31:25 +0100 |
parent | 10940:18e4e446a76c |
child | 12094:84fd6a79cda7 |
comparison
equal
deleted
inserted
replaced
11199:6c7c50a4de32 | 11200:bf8f2da84007 |
---|---|
1 local st = require "util.stanza"; | |
2 local new_xmpp_stream = require "util.xmppstream".new; | |
3 local sessionlib = require "util.session"; | |
4 local gettime = require "util.time".now; | |
5 local runner = require "util.async".runner; | |
6 local add_task = require "util.timer".add_task; | |
7 local events = require "util.events"; | |
8 local server = require "net.server"; | |
9 | |
10 local stream_close_timeout = 5; | |
11 | |
12 local log = require "util.logger".init("adminstream"); | |
13 | |
14 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; | |
15 | |
16 local stream_callbacks = { default_ns = "xmpp:prosody.im/admin" }; | |
17 | |
18 function stream_callbacks.streamopened(session, attr) | |
19 -- run _streamopened in async context | |
20 session.thread:run({ stream = "opened", attr = attr }); | |
21 end | |
22 | |
23 function stream_callbacks._streamopened(session, attr) --luacheck: ignore 212/attr | |
24 if session.type ~= "client" then | |
25 session:open_stream(); | |
26 end | |
27 session.notopen = nil; | |
28 end | |
29 | |
30 function stream_callbacks.streamclosed(session, attr) | |
31 -- run _streamclosed in async context | |
32 session.thread:run({ stream = "closed", attr = attr }); | |
33 end | |
34 | |
35 function stream_callbacks._streamclosed(session) | |
36 session.log("debug", "Received </stream:stream>"); | |
37 session:close(false); | |
38 end | |
39 | |
40 function stream_callbacks.error(session, error, data) | |
41 if error == "no-stream" then | |
42 session.log("debug", "Invalid opening stream header (%s)", (data:gsub("^([^\1]+)\1", "{%1}"))); | |
43 session:close("invalid-namespace"); | |
44 elseif error == "parse-error" then | |
45 session.log("debug", "Client XML parse error: %s", data); | |
46 session:close("not-well-formed"); | |
47 elseif error == "stream-error" then | |
48 local condition, text = "undefined-condition"; | |
49 for child in data:childtags(nil, xmlns_xmpp_streams) do | |
50 if child.name ~= "text" then | |
51 condition = child.name; | |
52 else | |
53 text = child:get_text(); | |
54 end | |
55 if condition ~= "undefined-condition" and text then | |
56 break; | |
57 end | |
58 end | |
59 text = condition .. (text and (" ("..text..")") or ""); | |
60 session.log("info", "Session closed by remote with error: %s", text); | |
61 session:close(nil, text); | |
62 end | |
63 end | |
64 | |
65 function stream_callbacks.handlestanza(session, stanza) | |
66 session.thread:run(stanza); | |
67 end | |
68 | |
69 local runner_callbacks = {}; | |
70 | |
71 function runner_callbacks:error(err) | |
72 self.data.log("error", "Traceback[c2s]: %s", err); | |
73 end | |
74 | |
75 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; | |
76 | |
77 local function destroy_session(session, reason) | |
78 if session.destroyed then return; end | |
79 session.destroyed = true; | |
80 session.log("debug", "Destroying session: %s", reason or "unknown reason"); | |
81 end | |
82 | |
83 local function session_close(session, reason) | |
84 local log = session.log or log; | |
85 if session.conn then | |
86 if session.notopen then | |
87 session:open_stream(); | |
88 end | |
89 if reason then -- nil == no err, initiated by us, false == initiated by client | |
90 local stream_error = st.stanza("stream:error"); | |
91 if type(reason) == "string" then -- assume stream error | |
92 stream_error:tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }); | |
93 elseif type(reason) == "table" then | |
94 if reason.condition then | |
95 stream_error:tag(reason.condition, stream_xmlns_attr):up(); | |
96 if reason.text then | |
97 stream_error:tag("text", stream_xmlns_attr):text(reason.text):up(); | |
98 end | |
99 if reason.extra then | |
100 stream_error:add_child(reason.extra); | |
101 end | |
102 elseif reason.name then -- a stanza | |
103 stream_error = reason; | |
104 end | |
105 end | |
106 stream_error = tostring(stream_error); | |
107 log("debug", "Disconnecting client, <stream:error> is: %s", stream_error); | |
108 session.send(stream_error); | |
109 end | |
110 | |
111 session.send("</stream:stream>"); | |
112 function session.send() return false; end | |
113 | |
114 local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason; | |
115 session.log("debug", "c2s stream for %s closed: %s", session.full_jid or session.ip or "<unknown>", reason_text or "session closed"); | |
116 | |
117 -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote | |
118 local conn = session.conn; | |
119 if reason_text == nil and not session.notopen and session.type == "c2s" then | |
120 -- Grace time to process data from authenticated cleanly-closed stream | |
121 add_task(stream_close_timeout, function () | |
122 if not session.destroyed then | |
123 session.log("warn", "Failed to receive a stream close response, closing connection anyway..."); | |
124 destroy_session(session); | |
125 conn:close(); | |
126 end | |
127 end); | |
128 else | |
129 destroy_session(session, reason_text); | |
130 conn:close(); | |
131 end | |
132 else | |
133 local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason; | |
134 destroy_session(session, reason_text); | |
135 end | |
136 end | |
137 | |
138 --- Public methods | |
139 | |
140 local function new_connection(socket_path, listeners) | |
141 local have_unix, unix = pcall(require, "socket.unix"); | |
142 if type(unix) ~= "table" then | |
143 have_unix = false; | |
144 end | |
145 local conn, sock; | |
146 | |
147 return { | |
148 connect = function () | |
149 if not have_unix then | |
150 return nil, "no unix socket support"; | |
151 end | |
152 if sock or conn then | |
153 return nil, "already connected"; | |
154 end | |
155 sock = unix.stream(); | |
156 sock:settimeout(0); | |
157 local ok, err = sock:connect(socket_path); | |
158 if not ok then | |
159 return nil, err; | |
160 end | |
161 conn = server.wrapclient(sock, nil, nil, listeners, "*a"); | |
162 return true; | |
163 end; | |
164 disconnect = function () | |
165 if conn then | |
166 conn:close(); | |
167 conn = nil; | |
168 end | |
169 if sock then | |
170 sock:close(); | |
171 sock = nil; | |
172 end | |
173 return true; | |
174 end; | |
175 }; | |
176 end | |
177 | |
178 local function new_server(sessions, stanza_handler) | |
179 local listeners = {}; | |
180 | |
181 function listeners.onconnect(conn) | |
182 log("debug", "New connection"); | |
183 local session = sessionlib.new("admin"); | |
184 sessionlib.set_id(session); | |
185 sessionlib.set_logger(session); | |
186 sessionlib.set_conn(session, conn); | |
187 | |
188 session.conntime = gettime(); | |
189 session.type = "admin"; | |
190 | |
191 local stream = new_xmpp_stream(session, stream_callbacks); | |
192 session.stream = stream; | |
193 session.notopen = true; | |
194 | |
195 session.thread = runner(function (stanza) | |
196 if st.is_stanza(stanza) then | |
197 stanza_handler(session, stanza); | |
198 elseif stanza.stream == "opened" then | |
199 stream_callbacks._streamopened(session, stanza.attr); | |
200 elseif stanza.stream == "closed" then | |
201 stream_callbacks._streamclosed(session, stanza.attr); | |
202 end | |
203 end, runner_callbacks, session); | |
204 | |
205 function session.data(data) | |
206 -- Parse the data, which will store stanzas in session.pending_stanzas | |
207 if data then | |
208 local ok, err = stream:feed(data); | |
209 if not ok then | |
210 session.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300)); | |
211 session:close("not-well-formed"); | |
212 end | |
213 end | |
214 end | |
215 | |
216 session.close = session_close; | |
217 | |
218 session.send = function (t) | |
219 session.log("debug", "Sending[%s]: %s", session.type, t.top_tag and t:top_tag() or t:match("^[^>]*>?")); | |
220 return session.rawsend(tostring(t)); | |
221 end | |
222 | |
223 function session.rawsend(t) | |
224 local ret, err = conn:write(t); | |
225 if not ret then | |
226 session.log("debug", "Error writing to connection: %s", err); | |
227 return false, err; | |
228 end | |
229 return true; | |
230 end | |
231 | |
232 sessions[conn] = session; | |
233 end | |
234 | |
235 function listeners.onincoming(conn, data) | |
236 local session = sessions[conn]; | |
237 if session then | |
238 session.data(data); | |
239 end | |
240 end | |
241 | |
242 function listeners.ondisconnect(conn, err) | |
243 local session = sessions[conn]; | |
244 if session then | |
245 session.log("info", "Admin client disconnected: %s", err or "connection closed"); | |
246 session.conn = nil; | |
247 sessions[conn] = nil; | |
248 end | |
249 end | |
250 | |
251 function listeners.onreadtimeout(conn) | |
252 return conn:send(" "); | |
253 end | |
254 | |
255 return { | |
256 listeners = listeners; | |
257 }; | |
258 end | |
259 | |
260 local function new_client() | |
261 local client = { | |
262 type = "client"; | |
263 events = events.new(); | |
264 log = log; | |
265 }; | |
266 | |
267 local listeners = {}; | |
268 | |
269 function listeners.onconnect(conn) | |
270 log("debug", "Connected"); | |
271 client.conn = conn; | |
272 | |
273 local stream = new_xmpp_stream(client, stream_callbacks); | |
274 client.stream = stream; | |
275 client.notopen = true; | |
276 | |
277 client.thread = runner(function (stanza) | |
278 if st.is_stanza(stanza) then | |
279 if not client.events.fire_event("received", stanza) and not stanza.attr.xmlns then | |
280 client.events.fire_event("received/"..stanza.name, stanza); | |
281 end | |
282 elseif stanza.stream == "opened" then | |
283 stream_callbacks._streamopened(client, stanza.attr); | |
284 client.events.fire_event("connected"); | |
285 elseif stanza.stream == "closed" then | |
286 client.events.fire_event("disconnected"); | |
287 stream_callbacks._streamclosed(client, stanza.attr); | |
288 end | |
289 end, runner_callbacks, client); | |
290 | |
291 client.close = session_close; | |
292 | |
293 function client.send(t) | |
294 client.log("debug", "Sending: %s", t.top_tag and t:top_tag() or t:match("^[^>]*>?")); | |
295 return client.rawsend(tostring(t)); | |
296 end | |
297 | |
298 function client.rawsend(t) | |
299 local ret, err = conn:write(t); | |
300 if not ret then | |
301 client.log("debug", "Error writing to connection: %s", err); | |
302 return false, err; | |
303 end | |
304 return true; | |
305 end | |
306 client.log("debug", "Opening stream..."); | |
307 client:open_stream(); | |
308 end | |
309 | |
310 function listeners.onincoming(conn, data) --luacheck: ignore 212/conn | |
311 local ok, err = client.stream:feed(data); | |
312 if not ok then | |
313 client.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300)); | |
314 client:close("not-well-formed"); | |
315 end | |
316 end | |
317 | |
318 function listeners.ondisconnect(conn, err) --luacheck: ignore 212/conn | |
319 client.log("info", "Admin client disconnected: %s", err or "connection closed"); | |
320 client.conn = nil; | |
321 end | |
322 | |
323 function listeners.onreadtimeout(conn) | |
324 conn:send(" "); | |
325 end | |
326 | |
327 client.listeners = listeners; | |
328 | |
329 return client; | |
330 end | |
331 | |
332 return { | |
333 connection = new_connection; | |
334 server = new_server; | |
335 client = new_client; | |
336 }; |