Software /
code /
prosody
File
net/http/parser.lua @ 11523:5f15ab7c6ae5
Statistics: Rewrite statistics backends to use OpenMetrics
The metric subsystem of Prosody has had some shortcomings from
the perspective of the current state-of-the-art in metric
observability.
The OpenMetrics standard [0] is a formalization of the data
model (and serialization format) of the well-known and
widely-used Prometheus [1] software stack.
The previous stats subsystem of Prosody did not map well to that
format (see e.g. [2] and [3]); the key reason is that it was
trying to do too much math on its own ([2]) while lacking
first-class support for "families" of metrics ([3]) and
structured metric metadata (despite the `extra` argument to
metrics, there was no standard way of representing common things
like "tags" or "labels").
Even though OpenMetrics has grown from the Prometheus world of
monitoring, it maps well to other popular monitoring stacks
such as:
- InfluxDB (labels can be mapped to tags and fields as necessary)
- Carbon/Graphite (labels can be attached to the metric name with
dot-separation)
- StatsD (see graphite when assuming that graphite is used as
backend, which is the default)
The util.statsd module has been ported to use the OpenMetrics
model as a proof of concept. An implementation which exposes
the util.statistics backend data as Prometheus metrics is
ready for publishing in prosody-modules (most likely as
mod_openmetrics_prometheus to avoid breaking existing 0.11
deployments).
At the same time, the previous measure()-based API had one major
advantage: It is really simple and easy to use without requiring
lots of knowledge about OpenMetrics or similar concepts. For that
reason as well as compatibility with existing code, it is preserved
and may even be extended in the future.
However, code relying on the `stats-updated` event as well as
`get_stats` from `statsmanager` will break because the data
model has changed completely; in case of `stats-updated`, the
code will simply not run (as the event was renamed in order
to avoid conflicts); the `get_stats` function has been removed
completely (so it will cause a traceback when it is attempted
to be used).
Note that the measure_*_event methods have been removed from
the module API. I was unable to find any uses or documentation
and thus deemed they should not be ported. Re-implementation is
possible when necessary.
[0]: https://openmetrics.io/
[1]: https://prometheus.io/
[2]: #959
[3]: #960
author | Jonas Schäfer <jonas@wielicki.name> |
---|---|
date | Sun, 18 Apr 2021 11:47:41 +0200 |
parent | 11184:2ede7f43ccfe |
child | 11727:f3aee8a825cc |
line wrap: on
line source
local tonumber = tonumber; local assert = assert; local url_parse = require "socket.url".parse; local urldecode = require "util.http".urldecode; local dbuffer = require "util.dbuffer"; local function preprocess_path(path) path = urldecode((path:gsub("//+", "/"))); if path:sub(1,1) ~= "/" then path = "/"..path; end local level = 0; for component in path:gmatch("([^/]+)/") do if component == ".." then level = level - 1; elseif component ~= "." then level = level + 1; end if level < 0 then return nil; end end return path; end local httpstream = {}; function httpstream.new(success_cb, error_cb, parser_type, options_cb) local client = true; if not parser_type or parser_type == "server" then client = false; else assert(parser_type == "client", "Invalid parser type"); end local bodylimit = tonumber(options_cb and options_cb().body_size_limit) or 10*1024*1024; -- https://stackoverflow.com/a/686243 -- Indiviual headers can be up to 16k? What madness? local headlimit = tonumber(options_cb and options_cb().head_size_limit) or 10*1024; local buflimit = tonumber(options_cb and options_cb().buffer_size_limit) or bodylimit * 2; local buffer = dbuffer.new(buflimit); local chunked; local state = nil; local packet; local len; local have_body; local error; return { feed = function(_, data) if error then return nil, "parse has failed"; end if not data then -- EOF if state and client and not len then -- reading client body until EOF buffer:collapse(); packet.body = buffer:read_chunk() or ""; packet.partial = nil; success_cb(packet); state = nil; elseif buffer:length() ~= 0 then -- unexpected EOF error = true; return error_cb("unexpected-eof"); end return; end if not buffer:write(data) then error = true; return error_cb("max-buffer-size-exceeded"); end while buffer:length() > 0 do if state == nil then -- read request local index = buffer:sub(1, headlimit):find("\r\n\r\n", nil, true); if not index then return; end -- not enough data -- FIXME was reason_phrase meant to be passed on somewhere? local method, path, httpversion, status_code, reason_phrase; -- luacheck: ignore reason_phrase local first_line; local headers = {}; for line in buffer:read(index+3):gmatch("([^\r\n]+)\r\n") do -- parse request if first_line then local key, val = line:match("^([^%s:]+): *(.*)$"); if not key then error = true; return error_cb("invalid-header-line"); end -- TODO handle multi-line and invalid headers key = key:lower(); headers[key] = headers[key] and headers[key]..","..val or val; else first_line = line; if client then httpversion, status_code, reason_phrase = line:match("^HTTP/(1%.[01]) (%d%d%d) (.*)$"); status_code = tonumber(status_code); if not status_code then error = true; return error_cb("invalid-status-line"); end have_body = not ( (options_cb and options_cb().method == "HEAD") or (status_code == 204 or status_code == 304 or status_code == 301) or (status_code >= 100 and status_code < 200) ); else method, path, httpversion = line:match("^(%w+) (%S+) HTTP/(1%.[01])$"); if not method then error = true; return error_cb("invalid-status-line"); end end end end if not first_line then error = true; return error_cb("invalid-status-line"); end chunked = have_body and headers["transfer-encoding"] == "chunked"; len = tonumber(headers["content-length"]); -- TODO check for invalid len if client then -- FIXME handle '100 Continue' response (by skipping it) if not have_body then len = 0; end packet = { code = status_code; httpversion = httpversion; headers = headers; body = false; body_length = len; chunked = chunked; partial = true; -- COMPAT the properties below are deprecated responseversion = httpversion; responseheaders = headers; }; else local parsed_url; if path:byte() == 47 then -- starts with / local _path, _query = path:match("([^?]*).?(.*)"); if _query == "" then _query = nil; end parsed_url = { path = _path, query = _query }; else parsed_url = url_parse(path); if not(parsed_url and parsed_url.path) then error = true; return error_cb("invalid-url"); end end path = preprocess_path(parsed_url.path); headers.host = parsed_url.host or headers.host; len = len or 0; packet = { method = method; url = parsed_url; path = path; httpversion = httpversion; headers = headers; body = false; body_sink = nil; chunked = chunked; partial = true; }; end if len and len > bodylimit then -- Early notification, for redirection success_cb(packet); if not packet.body_sink then error = true; return error_cb("content-length-limit-exceeded"); end end if chunked and not packet.body_sink then success_cb(packet); if not packet.body_sink then packet.body_buffer = dbuffer.new(buflimit); end end state = true; end if state then -- read body if chunked then local chunk_header = buffer:sub(1, 512); -- XXX How large do chunk headers grow? local chunk_size, chunk_start = chunk_header:match("^(%x+)[^\r\n]*\r\n()"); if not chunk_size then return; end chunk_size = chunk_size and tonumber(chunk_size, 16); if not chunk_size then error = true; return error_cb("invalid-chunk-size"); end if chunk_size == 0 and chunk_header:find("\r\n\r\n", chunk_start-2, true) then local body_buffer = packet.body_buffer; if body_buffer then packet.body_buffer = nil; body_buffer:collapse(); packet.body = body_buffer:read_chunk() or ""; end buffer:collapse(); local buf = buffer:read_chunk(); buf = buf:gsub("^.-\r\n\r\n", ""); -- This ensure extensions and trailers are stripped buffer:write(buf); state, chunked = nil, nil; packet.partial = nil; success_cb(packet); elseif buffer:length() - chunk_start - 2 >= chunk_size then -- we have a chunk buffer:discard(chunk_start - 1); -- TODO verify that it's not off-by-one (packet.body_sink or packet.body_buffer):write(buffer:read(chunk_size)); buffer:discard(2); -- CRLF else -- Partial chunk remaining break; end elseif packet.body_sink then local chunk = buffer:read_chunk(len); while chunk and len > 0 do if packet.body_sink:write(chunk) then len = len - #chunk; chunk = buffer:read_chunk(len); else error = true; return error_cb("body-sink-write-failure"); end end if len == 0 then state = nil; packet.partial = nil; success_cb(packet); end elseif buffer:length() >= len then assert(not chunked) packet.body = buffer:read(len) or ""; state = nil; packet.partial = nil; success_cb(packet); else break; end else break; end end end; }; end return httpstream;