Software / code / prosody
Comparison
util/adminstream.lua @ 10855:70ac7d23673d
mod_admin_socket, util.adminstream: New module to manage a local unix domain socket for admin functionality
| author | Matthew Wild <mwild1@gmail.com> |
|---|---|
| date | Mon, 01 Jun 2020 15:42:19 +0100 |
| child | 10875:09674bbb833f |
comparison
equal
deleted
inserted
replaced
| 10854:472fe13a05f9 | 10855:70ac7d23673d |
|---|---|
| 1 local st = require "util.stanza"; | |
| 2 local new_xmpp_stream = require "util.xmppstream".new; | |
| 3 local sessionlib = require "util.session"; | |
| 4 local gettime = require "util.time".now; | |
| 5 local runner = require "util.async".runner; | |
| 6 local add_task = require "util.timer".add_task; | |
| 7 local events = require "util.events"; | |
| 8 | |
| 9 local stream_close_timeout = 5; | |
| 10 | |
| 11 local log = require "util.logger".init("adminstream"); | |
| 12 | |
| 13 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; | |
| 14 | |
| 15 local stream_callbacks = { default_ns = "xmpp:prosody.im/admin" }; | |
| 16 | |
| 17 function stream_callbacks.streamopened(session, attr) | |
| 18 -- run _streamopened in async context | |
| 19 session.thread:run({ stream = "opened", attr = attr }); | |
| 20 end | |
| 21 | |
| 22 function stream_callbacks._streamopened(session, attr) --luacheck: ignore 212/attr | |
| 23 if session.type ~= "client" then | |
| 24 session:open_stream(); | |
| 25 end | |
| 26 session.notopen = nil; | |
| 27 end | |
| 28 | |
| 29 function stream_callbacks.streamclosed(session, attr) | |
| 30 -- run _streamclosed in async context | |
| 31 session.thread:run({ stream = "closed", attr = attr }); | |
| 32 end | |
| 33 | |
| 34 function stream_callbacks._streamclosed(session) | |
| 35 session.log("debug", "Received </stream:stream>"); | |
| 36 session:close(false); | |
| 37 end | |
| 38 | |
| 39 function stream_callbacks.error(session, error, data) | |
| 40 if error == "no-stream" then | |
| 41 session.log("debug", "Invalid opening stream header (%s)", (data:gsub("^([^\1]+)\1", "{%1}"))); | |
| 42 session:close("invalid-namespace"); | |
| 43 elseif error == "parse-error" then | |
| 44 session.log("debug", "Client XML parse error: %s", data); | |
| 45 session:close("not-well-formed"); | |
| 46 elseif error == "stream-error" then | |
| 47 local condition, text = "undefined-condition"; | |
| 48 for child in data:childtags(nil, xmlns_xmpp_streams) do | |
| 49 if child.name ~= "text" then | |
| 50 condition = child.name; | |
| 51 else | |
| 52 text = child:get_text(); | |
| 53 end | |
| 54 if condition ~= "undefined-condition" and text then | |
| 55 break; | |
| 56 end | |
| 57 end | |
| 58 text = condition .. (text and (" ("..text..")") or ""); | |
| 59 session.log("info", "Session closed by remote with error: %s", text); | |
| 60 session:close(nil, text); | |
| 61 end | |
| 62 end | |
| 63 | |
| 64 function stream_callbacks.handlestanza(session, stanza) | |
| 65 session.thread:run(stanza); | |
| 66 end | |
| 67 | |
| 68 local runner_callbacks = {}; | |
| 69 | |
| 70 function runner_callbacks:error(err) | |
| 71 self.data.log("error", "Traceback[c2s]: %s", err); | |
| 72 end | |
| 73 | |
| 74 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; | |
| 75 | |
| 76 local function destroy_session(session, reason) | |
| 77 if session.destroyed then return; end | |
| 78 session.destroyed = true; | |
| 79 session.log("debug", "Destroying session: %s", reason or "unknown reason"); | |
| 80 end | |
| 81 | |
| 82 local function session_close(session, reason) | |
| 83 local log = session.log or log; | |
| 84 if session.conn then | |
| 85 if session.notopen then | |
| 86 session:open_stream(); | |
| 87 end | |
| 88 if reason then -- nil == no err, initiated by us, false == initiated by client | |
| 89 local stream_error = st.stanza("stream:error"); | |
| 90 if type(reason) == "string" then -- assume stream error | |
| 91 stream_error:tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }); | |
| 92 elseif type(reason) == "table" then | |
| 93 if reason.condition then | |
| 94 stream_error:tag(reason.condition, stream_xmlns_attr):up(); | |
| 95 if reason.text then | |
| 96 stream_error:tag("text", stream_xmlns_attr):text(reason.text):up(); | |
| 97 end | |
| 98 if reason.extra then | |
| 99 stream_error:add_child(reason.extra); | |
| 100 end | |
| 101 elseif reason.name then -- a stanza | |
| 102 stream_error = reason; | |
| 103 end | |
| 104 end | |
| 105 stream_error = tostring(stream_error); | |
| 106 log("debug", "Disconnecting client, <stream:error> is: %s", stream_error); | |
| 107 session.send(stream_error); | |
| 108 end | |
| 109 | |
| 110 session.send("</stream:stream>"); | |
| 111 function session.send() return false; end | |
| 112 | |
| 113 local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason; | |
| 114 session.log("debug", "c2s stream for %s closed: %s", session.full_jid or session.ip or "<unknown>", reason_text or "session closed"); | |
| 115 | |
| 116 -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote | |
| 117 local conn = session.conn; | |
| 118 if reason_text == nil and not session.notopen and session.type == "c2s" then | |
| 119 -- Grace time to process data from authenticated cleanly-closed stream | |
| 120 add_task(stream_close_timeout, function () | |
| 121 if not session.destroyed then | |
| 122 session.log("warn", "Failed to receive a stream close response, closing connection anyway..."); | |
| 123 destroy_session(session); | |
| 124 conn:close(); | |
| 125 end | |
| 126 end); | |
| 127 else | |
| 128 destroy_session(session, reason_text); | |
| 129 conn:close(); | |
| 130 end | |
| 131 else | |
| 132 local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason; | |
| 133 destroy_session(session, reason_text); | |
| 134 end | |
| 135 end | |
| 136 | |
| 137 --- Public methods | |
| 138 | |
| 139 local function new_server(sessions, stanza_handler) | |
| 140 local listeners = {}; | |
| 141 | |
| 142 function listeners.onconnect(conn) | |
| 143 log("debug", "New connection"); | |
| 144 local session = sessionlib.new("admin"); | |
| 145 sessionlib.set_id(session); | |
| 146 sessionlib.set_logger(session); | |
| 147 sessionlib.set_conn(session, conn); | |
| 148 | |
| 149 session.conntime = gettime(); | |
| 150 session.type = "admin"; | |
| 151 | |
| 152 local stream = new_xmpp_stream(session, stream_callbacks); | |
| 153 session.stream = stream; | |
| 154 session.notopen = true; | |
| 155 | |
| 156 session.thread = runner(function (stanza) | |
| 157 if st.is_stanza(stanza) then | |
| 158 stanza_handler(session, stanza); | |
| 159 elseif stanza.stream == "opened" then | |
| 160 stream_callbacks._streamopened(session, stanza.attr); | |
| 161 elseif stanza.stream == "closed" then | |
| 162 stream_callbacks._streamclosed(session, stanza.attr); | |
| 163 end | |
| 164 end, runner_callbacks, session); | |
| 165 | |
| 166 function session.data(data) | |
| 167 -- Parse the data, which will store stanzas in session.pending_stanzas | |
| 168 if data then | |
| 169 local ok, err = stream:feed(data); | |
| 170 if not ok then | |
| 171 session.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300)); | |
| 172 session:close("not-well-formed"); | |
| 173 end | |
| 174 end | |
| 175 end | |
| 176 | |
| 177 session.close = session_close; | |
| 178 | |
| 179 session.send = function (t) | |
| 180 session.log("debug", "Sending[%s]: %s", session.type, t.top_tag and t:top_tag() or t:match("^[^>]*>?")); | |
| 181 return session.rawsend(tostring(t)); | |
| 182 end | |
| 183 | |
| 184 function session.rawsend(t) | |
| 185 local ret, err = conn:write(t); | |
| 186 if not ret then | |
| 187 session.log("debug", "Error writing to connection: %s", err); | |
| 188 return false, err; | |
| 189 end | |
| 190 return true; | |
| 191 end | |
| 192 | |
| 193 sessions[conn] = session; | |
| 194 end | |
| 195 | |
| 196 function listeners.onincoming(conn, data) | |
| 197 local session = sessions[conn]; | |
| 198 if session then | |
| 199 session.data(data); | |
| 200 end | |
| 201 end | |
| 202 | |
| 203 function listeners.ondisconnect(conn, err) | |
| 204 local session = sessions[conn]; | |
| 205 if session then | |
| 206 session.log("info", "Admin client disconnected: %s", err or "connection closed"); | |
| 207 session.conn = nil; | |
| 208 sessions[conn] = nil; | |
| 209 end | |
| 210 end | |
| 211 return { | |
| 212 listeners = listeners; | |
| 213 }; | |
| 214 end | |
| 215 | |
| 216 local function new_client() | |
| 217 local client = { | |
| 218 type = "client"; | |
| 219 events = events.new(); | |
| 220 log = log; | |
| 221 }; | |
| 222 | |
| 223 local listeners = {}; | |
| 224 | |
| 225 function listeners.onconnect(conn) | |
| 226 log("debug", "Connected"); | |
| 227 client.conn = conn; | |
| 228 | |
| 229 local stream = new_xmpp_stream(client, stream_callbacks); | |
| 230 client.stream = stream; | |
| 231 client.notopen = true; | |
| 232 | |
| 233 client.thread = runner(function (stanza) | |
| 234 if st.is_stanza(stanza) then | |
| 235 client.events.fire_event("received", stanza); | |
| 236 elseif stanza.stream == "opened" then | |
| 237 stream_callbacks._streamopened(client, stanza.attr); | |
| 238 client.events.fire_event("connected"); | |
| 239 elseif stanza.stream == "closed" then | |
| 240 client.events.fire_event("disconnected"); | |
| 241 stream_callbacks._streamclosed(client, stanza.attr); | |
| 242 end | |
| 243 end, runner_callbacks, client); | |
| 244 | |
| 245 client.close = session_close; | |
| 246 | |
| 247 function client.send(t) | |
| 248 client.log("debug", "Sending: %s", t.top_tag and t:top_tag() or t:match("^[^>]*>?")); | |
| 249 return client.rawsend(tostring(t)); | |
| 250 end | |
| 251 | |
| 252 function client.rawsend(t) | |
| 253 local ret, err = conn:write(t); | |
| 254 if not ret then | |
| 255 client.log("debug", "Error writing to connection: %s", err); | |
| 256 return false, err; | |
| 257 end | |
| 258 return true; | |
| 259 end | |
| 260 client.log("debug", "Opening stream..."); | |
| 261 client:open_stream(); | |
| 262 end | |
| 263 | |
| 264 function listeners.onincoming(conn, data) --luacheck: ignore 212/conn | |
| 265 local ok, err = client.stream:feed(data); | |
| 266 if not ok then | |
| 267 client.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300)); | |
| 268 client:close("not-well-formed"); | |
| 269 end | |
| 270 end | |
| 271 | |
| 272 function listeners.ondisconnect(conn, err) --luacheck: ignore 212/conn | |
| 273 client.log("info", "Admin client disconnected: %s", err or "connection closed"); | |
| 274 client.conn = nil; | |
| 275 end | |
| 276 | |
| 277 client.listeners = listeners; | |
| 278 | |
| 279 return client; | |
| 280 end | |
| 281 | |
| 282 return { | |
| 283 server = new_server; | |
| 284 client = new_client; | |
| 285 }; |