Software / code / prosody
Comparison
util/adminstream.lua @ 11200:bf8f2da84007
Merge 0.11->trunk
| author | Kim Alvefur <zash@zash.se> |
|---|---|
| date | Thu, 05 Nov 2020 22:31:25 +0100 |
| parent | 10940:18e4e446a76c |
| child | 12094:84fd6a79cda7 |
comparison
equal
deleted
inserted
replaced
| 11199:6c7c50a4de32 | 11200:bf8f2da84007 |
|---|---|
| 1 local st = require "util.stanza"; | |
| 2 local new_xmpp_stream = require "util.xmppstream".new; | |
| 3 local sessionlib = require "util.session"; | |
| 4 local gettime = require "util.time".now; | |
| 5 local runner = require "util.async".runner; | |
| 6 local add_task = require "util.timer".add_task; | |
| 7 local events = require "util.events"; | |
| 8 local server = require "net.server"; | |
| 9 | |
| 10 local stream_close_timeout = 5; | |
| 11 | |
| 12 local log = require "util.logger".init("adminstream"); | |
| 13 | |
| 14 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; | |
| 15 | |
| 16 local stream_callbacks = { default_ns = "xmpp:prosody.im/admin" }; | |
| 17 | |
| 18 function stream_callbacks.streamopened(session, attr) | |
| 19 -- run _streamopened in async context | |
| 20 session.thread:run({ stream = "opened", attr = attr }); | |
| 21 end | |
| 22 | |
| 23 function stream_callbacks._streamopened(session, attr) --luacheck: ignore 212/attr | |
| 24 if session.type ~= "client" then | |
| 25 session:open_stream(); | |
| 26 end | |
| 27 session.notopen = nil; | |
| 28 end | |
| 29 | |
| 30 function stream_callbacks.streamclosed(session, attr) | |
| 31 -- run _streamclosed in async context | |
| 32 session.thread:run({ stream = "closed", attr = attr }); | |
| 33 end | |
| 34 | |
| 35 function stream_callbacks._streamclosed(session) | |
| 36 session.log("debug", "Received </stream:stream>"); | |
| 37 session:close(false); | |
| 38 end | |
| 39 | |
| 40 function stream_callbacks.error(session, error, data) | |
| 41 if error == "no-stream" then | |
| 42 session.log("debug", "Invalid opening stream header (%s)", (data:gsub("^([^\1]+)\1", "{%1}"))); | |
| 43 session:close("invalid-namespace"); | |
| 44 elseif error == "parse-error" then | |
| 45 session.log("debug", "Client XML parse error: %s", data); | |
| 46 session:close("not-well-formed"); | |
| 47 elseif error == "stream-error" then | |
| 48 local condition, text = "undefined-condition"; | |
| 49 for child in data:childtags(nil, xmlns_xmpp_streams) do | |
| 50 if child.name ~= "text" then | |
| 51 condition = child.name; | |
| 52 else | |
| 53 text = child:get_text(); | |
| 54 end | |
| 55 if condition ~= "undefined-condition" and text then | |
| 56 break; | |
| 57 end | |
| 58 end | |
| 59 text = condition .. (text and (" ("..text..")") or ""); | |
| 60 session.log("info", "Session closed by remote with error: %s", text); | |
| 61 session:close(nil, text); | |
| 62 end | |
| 63 end | |
| 64 | |
| 65 function stream_callbacks.handlestanza(session, stanza) | |
| 66 session.thread:run(stanza); | |
| 67 end | |
| 68 | |
| 69 local runner_callbacks = {}; | |
| 70 | |
| 71 function runner_callbacks:error(err) | |
| 72 self.data.log("error", "Traceback[c2s]: %s", err); | |
| 73 end | |
| 74 | |
| 75 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; | |
| 76 | |
| 77 local function destroy_session(session, reason) | |
| 78 if session.destroyed then return; end | |
| 79 session.destroyed = true; | |
| 80 session.log("debug", "Destroying session: %s", reason or "unknown reason"); | |
| 81 end | |
| 82 | |
| 83 local function session_close(session, reason) | |
| 84 local log = session.log or log; | |
| 85 if session.conn then | |
| 86 if session.notopen then | |
| 87 session:open_stream(); | |
| 88 end | |
| 89 if reason then -- nil == no err, initiated by us, false == initiated by client | |
| 90 local stream_error = st.stanza("stream:error"); | |
| 91 if type(reason) == "string" then -- assume stream error | |
| 92 stream_error:tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }); | |
| 93 elseif type(reason) == "table" then | |
| 94 if reason.condition then | |
| 95 stream_error:tag(reason.condition, stream_xmlns_attr):up(); | |
| 96 if reason.text then | |
| 97 stream_error:tag("text", stream_xmlns_attr):text(reason.text):up(); | |
| 98 end | |
| 99 if reason.extra then | |
| 100 stream_error:add_child(reason.extra); | |
| 101 end | |
| 102 elseif reason.name then -- a stanza | |
| 103 stream_error = reason; | |
| 104 end | |
| 105 end | |
| 106 stream_error = tostring(stream_error); | |
| 107 log("debug", "Disconnecting client, <stream:error> is: %s", stream_error); | |
| 108 session.send(stream_error); | |
| 109 end | |
| 110 | |
| 111 session.send("</stream:stream>"); | |
| 112 function session.send() return false; end | |
| 113 | |
| 114 local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason; | |
| 115 session.log("debug", "c2s stream for %s closed: %s", session.full_jid or session.ip or "<unknown>", reason_text or "session closed"); | |
| 116 | |
| 117 -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote | |
| 118 local conn = session.conn; | |
| 119 if reason_text == nil and not session.notopen and session.type == "c2s" then | |
| 120 -- Grace time to process data from authenticated cleanly-closed stream | |
| 121 add_task(stream_close_timeout, function () | |
| 122 if not session.destroyed then | |
| 123 session.log("warn", "Failed to receive a stream close response, closing connection anyway..."); | |
| 124 destroy_session(session); | |
| 125 conn:close(); | |
| 126 end | |
| 127 end); | |
| 128 else | |
| 129 destroy_session(session, reason_text); | |
| 130 conn:close(); | |
| 131 end | |
| 132 else | |
| 133 local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason; | |
| 134 destroy_session(session, reason_text); | |
| 135 end | |
| 136 end | |
| 137 | |
| 138 --- Public methods | |
| 139 | |
| 140 local function new_connection(socket_path, listeners) | |
| 141 local have_unix, unix = pcall(require, "socket.unix"); | |
| 142 if type(unix) ~= "table" then | |
| 143 have_unix = false; | |
| 144 end | |
| 145 local conn, sock; | |
| 146 | |
| 147 return { | |
| 148 connect = function () | |
| 149 if not have_unix then | |
| 150 return nil, "no unix socket support"; | |
| 151 end | |
| 152 if sock or conn then | |
| 153 return nil, "already connected"; | |
| 154 end | |
| 155 sock = unix.stream(); | |
| 156 sock:settimeout(0); | |
| 157 local ok, err = sock:connect(socket_path); | |
| 158 if not ok then | |
| 159 return nil, err; | |
| 160 end | |
| 161 conn = server.wrapclient(sock, nil, nil, listeners, "*a"); | |
| 162 return true; | |
| 163 end; | |
| 164 disconnect = function () | |
| 165 if conn then | |
| 166 conn:close(); | |
| 167 conn = nil; | |
| 168 end | |
| 169 if sock then | |
| 170 sock:close(); | |
| 171 sock = nil; | |
| 172 end | |
| 173 return true; | |
| 174 end; | |
| 175 }; | |
| 176 end | |
| 177 | |
| 178 local function new_server(sessions, stanza_handler) | |
| 179 local listeners = {}; | |
| 180 | |
| 181 function listeners.onconnect(conn) | |
| 182 log("debug", "New connection"); | |
| 183 local session = sessionlib.new("admin"); | |
| 184 sessionlib.set_id(session); | |
| 185 sessionlib.set_logger(session); | |
| 186 sessionlib.set_conn(session, conn); | |
| 187 | |
| 188 session.conntime = gettime(); | |
| 189 session.type = "admin"; | |
| 190 | |
| 191 local stream = new_xmpp_stream(session, stream_callbacks); | |
| 192 session.stream = stream; | |
| 193 session.notopen = true; | |
| 194 | |
| 195 session.thread = runner(function (stanza) | |
| 196 if st.is_stanza(stanza) then | |
| 197 stanza_handler(session, stanza); | |
| 198 elseif stanza.stream == "opened" then | |
| 199 stream_callbacks._streamopened(session, stanza.attr); | |
| 200 elseif stanza.stream == "closed" then | |
| 201 stream_callbacks._streamclosed(session, stanza.attr); | |
| 202 end | |
| 203 end, runner_callbacks, session); | |
| 204 | |
| 205 function session.data(data) | |
| 206 -- Parse the data, which will store stanzas in session.pending_stanzas | |
| 207 if data then | |
| 208 local ok, err = stream:feed(data); | |
| 209 if not ok then | |
| 210 session.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300)); | |
| 211 session:close("not-well-formed"); | |
| 212 end | |
| 213 end | |
| 214 end | |
| 215 | |
| 216 session.close = session_close; | |
| 217 | |
| 218 session.send = function (t) | |
| 219 session.log("debug", "Sending[%s]: %s", session.type, t.top_tag and t:top_tag() or t:match("^[^>]*>?")); | |
| 220 return session.rawsend(tostring(t)); | |
| 221 end | |
| 222 | |
| 223 function session.rawsend(t) | |
| 224 local ret, err = conn:write(t); | |
| 225 if not ret then | |
| 226 session.log("debug", "Error writing to connection: %s", err); | |
| 227 return false, err; | |
| 228 end | |
| 229 return true; | |
| 230 end | |
| 231 | |
| 232 sessions[conn] = session; | |
| 233 end | |
| 234 | |
| 235 function listeners.onincoming(conn, data) | |
| 236 local session = sessions[conn]; | |
| 237 if session then | |
| 238 session.data(data); | |
| 239 end | |
| 240 end | |
| 241 | |
| 242 function listeners.ondisconnect(conn, err) | |
| 243 local session = sessions[conn]; | |
| 244 if session then | |
| 245 session.log("info", "Admin client disconnected: %s", err or "connection closed"); | |
| 246 session.conn = nil; | |
| 247 sessions[conn] = nil; | |
| 248 end | |
| 249 end | |
| 250 | |
| 251 function listeners.onreadtimeout(conn) | |
| 252 return conn:send(" "); | |
| 253 end | |
| 254 | |
| 255 return { | |
| 256 listeners = listeners; | |
| 257 }; | |
| 258 end | |
| 259 | |
| 260 local function new_client() | |
| 261 local client = { | |
| 262 type = "client"; | |
| 263 events = events.new(); | |
| 264 log = log; | |
| 265 }; | |
| 266 | |
| 267 local listeners = {}; | |
| 268 | |
| 269 function listeners.onconnect(conn) | |
| 270 log("debug", "Connected"); | |
| 271 client.conn = conn; | |
| 272 | |
| 273 local stream = new_xmpp_stream(client, stream_callbacks); | |
| 274 client.stream = stream; | |
| 275 client.notopen = true; | |
| 276 | |
| 277 client.thread = runner(function (stanza) | |
| 278 if st.is_stanza(stanza) then | |
| 279 if not client.events.fire_event("received", stanza) and not stanza.attr.xmlns then | |
| 280 client.events.fire_event("received/"..stanza.name, stanza); | |
| 281 end | |
| 282 elseif stanza.stream == "opened" then | |
| 283 stream_callbacks._streamopened(client, stanza.attr); | |
| 284 client.events.fire_event("connected"); | |
| 285 elseif stanza.stream == "closed" then | |
| 286 client.events.fire_event("disconnected"); | |
| 287 stream_callbacks._streamclosed(client, stanza.attr); | |
| 288 end | |
| 289 end, runner_callbacks, client); | |
| 290 | |
| 291 client.close = session_close; | |
| 292 | |
| 293 function client.send(t) | |
| 294 client.log("debug", "Sending: %s", t.top_tag and t:top_tag() or t:match("^[^>]*>?")); | |
| 295 return client.rawsend(tostring(t)); | |
| 296 end | |
| 297 | |
| 298 function client.rawsend(t) | |
| 299 local ret, err = conn:write(t); | |
| 300 if not ret then | |
| 301 client.log("debug", "Error writing to connection: %s", err); | |
| 302 return false, err; | |
| 303 end | |
| 304 return true; | |
| 305 end | |
| 306 client.log("debug", "Opening stream..."); | |
| 307 client:open_stream(); | |
| 308 end | |
| 309 | |
| 310 function listeners.onincoming(conn, data) --luacheck: ignore 212/conn | |
| 311 local ok, err = client.stream:feed(data); | |
| 312 if not ok then | |
| 313 client.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300)); | |
| 314 client:close("not-well-formed"); | |
| 315 end | |
| 316 end | |
| 317 | |
| 318 function listeners.ondisconnect(conn, err) --luacheck: ignore 212/conn | |
| 319 client.log("info", "Admin client disconnected: %s", err or "connection closed"); | |
| 320 client.conn = nil; | |
| 321 end | |
| 322 | |
| 323 function listeners.onreadtimeout(conn) | |
| 324 conn:send(" "); | |
| 325 end | |
| 326 | |
| 327 client.listeners = listeners; | |
| 328 | |
| 329 return client; | |
| 330 end | |
| 331 | |
| 332 return { | |
| 333 connection = new_connection; | |
| 334 server = new_server; | |
| 335 client = new_client; | |
| 336 }; |