Software /
code /
prosody
Comparison
util/adminstream.lua @ 10855:70ac7d23673d
mod_admin_socket, util.adminstream: New module to manage a local unix domain socket for admin functionality
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Mon, 01 Jun 2020 15:42:19 +0100 |
child | 10875:09674bbb833f |
comparison
equal
deleted
inserted
replaced
10854:472fe13a05f9 | 10855:70ac7d23673d |
---|---|
1 local st = require "util.stanza"; | |
2 local new_xmpp_stream = require "util.xmppstream".new; | |
3 local sessionlib = require "util.session"; | |
4 local gettime = require "util.time".now; | |
5 local runner = require "util.async".runner; | |
6 local add_task = require "util.timer".add_task; | |
7 local events = require "util.events"; | |
8 | |
9 local stream_close_timeout = 5; | |
10 | |
11 local log = require "util.logger".init("adminstream"); | |
12 | |
13 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; | |
14 | |
15 local stream_callbacks = { default_ns = "xmpp:prosody.im/admin" }; | |
16 | |
17 function stream_callbacks.streamopened(session, attr) | |
18 -- run _streamopened in async context | |
19 session.thread:run({ stream = "opened", attr = attr }); | |
20 end | |
21 | |
22 function stream_callbacks._streamopened(session, attr) --luacheck: ignore 212/attr | |
23 if session.type ~= "client" then | |
24 session:open_stream(); | |
25 end | |
26 session.notopen = nil; | |
27 end | |
28 | |
29 function stream_callbacks.streamclosed(session, attr) | |
30 -- run _streamclosed in async context | |
31 session.thread:run({ stream = "closed", attr = attr }); | |
32 end | |
33 | |
34 function stream_callbacks._streamclosed(session) | |
35 session.log("debug", "Received </stream:stream>"); | |
36 session:close(false); | |
37 end | |
38 | |
39 function stream_callbacks.error(session, error, data) | |
40 if error == "no-stream" then | |
41 session.log("debug", "Invalid opening stream header (%s)", (data:gsub("^([^\1]+)\1", "{%1}"))); | |
42 session:close("invalid-namespace"); | |
43 elseif error == "parse-error" then | |
44 session.log("debug", "Client XML parse error: %s", data); | |
45 session:close("not-well-formed"); | |
46 elseif error == "stream-error" then | |
47 local condition, text = "undefined-condition"; | |
48 for child in data:childtags(nil, xmlns_xmpp_streams) do | |
49 if child.name ~= "text" then | |
50 condition = child.name; | |
51 else | |
52 text = child:get_text(); | |
53 end | |
54 if condition ~= "undefined-condition" and text then | |
55 break; | |
56 end | |
57 end | |
58 text = condition .. (text and (" ("..text..")") or ""); | |
59 session.log("info", "Session closed by remote with error: %s", text); | |
60 session:close(nil, text); | |
61 end | |
62 end | |
63 | |
64 function stream_callbacks.handlestanza(session, stanza) | |
65 session.thread:run(stanza); | |
66 end | |
67 | |
68 local runner_callbacks = {}; | |
69 | |
70 function runner_callbacks:error(err) | |
71 self.data.log("error", "Traceback[c2s]: %s", err); | |
72 end | |
73 | |
74 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; | |
75 | |
76 local function destroy_session(session, reason) | |
77 if session.destroyed then return; end | |
78 session.destroyed = true; | |
79 session.log("debug", "Destroying session: %s", reason or "unknown reason"); | |
80 end | |
81 | |
82 local function session_close(session, reason) | |
83 local log = session.log or log; | |
84 if session.conn then | |
85 if session.notopen then | |
86 session:open_stream(); | |
87 end | |
88 if reason then -- nil == no err, initiated by us, false == initiated by client | |
89 local stream_error = st.stanza("stream:error"); | |
90 if type(reason) == "string" then -- assume stream error | |
91 stream_error:tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }); | |
92 elseif type(reason) == "table" then | |
93 if reason.condition then | |
94 stream_error:tag(reason.condition, stream_xmlns_attr):up(); | |
95 if reason.text then | |
96 stream_error:tag("text", stream_xmlns_attr):text(reason.text):up(); | |
97 end | |
98 if reason.extra then | |
99 stream_error:add_child(reason.extra); | |
100 end | |
101 elseif reason.name then -- a stanza | |
102 stream_error = reason; | |
103 end | |
104 end | |
105 stream_error = tostring(stream_error); | |
106 log("debug", "Disconnecting client, <stream:error> is: %s", stream_error); | |
107 session.send(stream_error); | |
108 end | |
109 | |
110 session.send("</stream:stream>"); | |
111 function session.send() return false; end | |
112 | |
113 local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason; | |
114 session.log("debug", "c2s stream for %s closed: %s", session.full_jid or session.ip or "<unknown>", reason_text or "session closed"); | |
115 | |
116 -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote | |
117 local conn = session.conn; | |
118 if reason_text == nil and not session.notopen and session.type == "c2s" then | |
119 -- Grace time to process data from authenticated cleanly-closed stream | |
120 add_task(stream_close_timeout, function () | |
121 if not session.destroyed then | |
122 session.log("warn", "Failed to receive a stream close response, closing connection anyway..."); | |
123 destroy_session(session); | |
124 conn:close(); | |
125 end | |
126 end); | |
127 else | |
128 destroy_session(session, reason_text); | |
129 conn:close(); | |
130 end | |
131 else | |
132 local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason; | |
133 destroy_session(session, reason_text); | |
134 end | |
135 end | |
136 | |
137 --- Public methods | |
138 | |
139 local function new_server(sessions, stanza_handler) | |
140 local listeners = {}; | |
141 | |
142 function listeners.onconnect(conn) | |
143 log("debug", "New connection"); | |
144 local session = sessionlib.new("admin"); | |
145 sessionlib.set_id(session); | |
146 sessionlib.set_logger(session); | |
147 sessionlib.set_conn(session, conn); | |
148 | |
149 session.conntime = gettime(); | |
150 session.type = "admin"; | |
151 | |
152 local stream = new_xmpp_stream(session, stream_callbacks); | |
153 session.stream = stream; | |
154 session.notopen = true; | |
155 | |
156 session.thread = runner(function (stanza) | |
157 if st.is_stanza(stanza) then | |
158 stanza_handler(session, stanza); | |
159 elseif stanza.stream == "opened" then | |
160 stream_callbacks._streamopened(session, stanza.attr); | |
161 elseif stanza.stream == "closed" then | |
162 stream_callbacks._streamclosed(session, stanza.attr); | |
163 end | |
164 end, runner_callbacks, session); | |
165 | |
166 function session.data(data) | |
167 -- Parse the data, which will store stanzas in session.pending_stanzas | |
168 if data then | |
169 local ok, err = stream:feed(data); | |
170 if not ok then | |
171 session.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300)); | |
172 session:close("not-well-formed"); | |
173 end | |
174 end | |
175 end | |
176 | |
177 session.close = session_close; | |
178 | |
179 session.send = function (t) | |
180 session.log("debug", "Sending[%s]: %s", session.type, t.top_tag and t:top_tag() or t:match("^[^>]*>?")); | |
181 return session.rawsend(tostring(t)); | |
182 end | |
183 | |
184 function session.rawsend(t) | |
185 local ret, err = conn:write(t); | |
186 if not ret then | |
187 session.log("debug", "Error writing to connection: %s", err); | |
188 return false, err; | |
189 end | |
190 return true; | |
191 end | |
192 | |
193 sessions[conn] = session; | |
194 end | |
195 | |
196 function listeners.onincoming(conn, data) | |
197 local session = sessions[conn]; | |
198 if session then | |
199 session.data(data); | |
200 end | |
201 end | |
202 | |
203 function listeners.ondisconnect(conn, err) | |
204 local session = sessions[conn]; | |
205 if session then | |
206 session.log("info", "Admin client disconnected: %s", err or "connection closed"); | |
207 session.conn = nil; | |
208 sessions[conn] = nil; | |
209 end | |
210 end | |
211 return { | |
212 listeners = listeners; | |
213 }; | |
214 end | |
215 | |
216 local function new_client() | |
217 local client = { | |
218 type = "client"; | |
219 events = events.new(); | |
220 log = log; | |
221 }; | |
222 | |
223 local listeners = {}; | |
224 | |
225 function listeners.onconnect(conn) | |
226 log("debug", "Connected"); | |
227 client.conn = conn; | |
228 | |
229 local stream = new_xmpp_stream(client, stream_callbacks); | |
230 client.stream = stream; | |
231 client.notopen = true; | |
232 | |
233 client.thread = runner(function (stanza) | |
234 if st.is_stanza(stanza) then | |
235 client.events.fire_event("received", stanza); | |
236 elseif stanza.stream == "opened" then | |
237 stream_callbacks._streamopened(client, stanza.attr); | |
238 client.events.fire_event("connected"); | |
239 elseif stanza.stream == "closed" then | |
240 client.events.fire_event("disconnected"); | |
241 stream_callbacks._streamclosed(client, stanza.attr); | |
242 end | |
243 end, runner_callbacks, client); | |
244 | |
245 client.close = session_close; | |
246 | |
247 function client.send(t) | |
248 client.log("debug", "Sending: %s", t.top_tag and t:top_tag() or t:match("^[^>]*>?")); | |
249 return client.rawsend(tostring(t)); | |
250 end | |
251 | |
252 function client.rawsend(t) | |
253 local ret, err = conn:write(t); | |
254 if not ret then | |
255 client.log("debug", "Error writing to connection: %s", err); | |
256 return false, err; | |
257 end | |
258 return true; | |
259 end | |
260 client.log("debug", "Opening stream..."); | |
261 client:open_stream(); | |
262 end | |
263 | |
264 function listeners.onincoming(conn, data) --luacheck: ignore 212/conn | |
265 local ok, err = client.stream:feed(data); | |
266 if not ok then | |
267 client.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300)); | |
268 client:close("not-well-formed"); | |
269 end | |
270 end | |
271 | |
272 function listeners.ondisconnect(conn, err) --luacheck: ignore 212/conn | |
273 client.log("info", "Admin client disconnected: %s", err or "connection closed"); | |
274 client.conn = nil; | |
275 end | |
276 | |
277 client.listeners = listeners; | |
278 | |
279 return client; | |
280 end | |
281 | |
282 return { | |
283 server = new_server; | |
284 client = new_client; | |
285 }; |