Software /
code /
prosody
File
plugins/mod_s2s/s2sout.lib.lua @ 9751:39ee70fbb009
mod_mam: Perform message expiry based on building an index by date
For each day, store a set of users that have new messages. To expire
messages, we collect the union of sets of users from dates that fall
outside the cleanup range.
The previous algoritm did not work well with many users, especially with
the default settings.
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Thu, 03 Jan 2019 17:25:43 +0100 |
parent | 9414:a48579a7b709 |
child | 9871:744e08ac5596 |
line wrap: on
line source
-- Prosody IM -- Copyright (C) 2008-2010 Matthew Wild -- Copyright (C) 2008-2010 Waqas Hussain -- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- --- Module containing all the logic for connecting to a remote server -- luacheck: ignore 432/err local portmanager = require "core.portmanager"; local wrapclient = require "net.server".wrapclient; local initialize_filters = require "util.filters".initialize; local idna_to_ascii = require "util.encodings".idna.to_ascii; local new_ip = require "util.ip".new_ip; local rfc6724_dest = require "util.rfc6724".destination; local socket = require "socket"; local adns = require "net.adns"; local t_insert, t_sort, ipairs = table.insert, table.sort, ipairs; local local_addresses = require "util.net".local_addresses; local s2s_destroy_session = require "core.s2smanager".destroy_session; local default_mode = module:get_option("network_default_read_size", 4096); local log = module._log; local sources = {}; local has_ipv4, has_ipv6; local dns_timeout = module:get_option_number("dns_timeout", 15); local resolvers = module:get_option_set("s2s_dns_resolvers") local s2sout = {}; local s2s_listener; function s2sout.set_listener(listener) s2s_listener = listener; end local function compare_srv_priorities(a,b) return a.priority < b.priority or (a.priority == b.priority and a.weight > b.weight); end function s2sout.initiate_connection(host_session) local log = host_session.log or log; initialize_filters(host_session); host_session.version = 1; host_session.resolver = adns.resolver(); host_session.resolver._resolver:settimeout(dns_timeout); if resolvers then for resolver in resolvers do host_session.resolver._resolver:addnameserver(resolver); end end -- Kick the connection attempting machine into life if not s2sout.attempt_connection(host_session) then -- Intentionally not returning here, the -- session is needed, connected or not s2s_destroy_session(host_session); end if not host_session.sends2s then -- A sends2s which buffers data (until the stream is opened) -- note that data in this buffer will be sent before the stream is authed -- and will not be ack'd in any way, successful or otherwise local buffer; function host_session.sends2s(data) if not buffer then buffer = {}; host_session.send_buffer = buffer; end log("debug", "Buffering data on unconnected s2sout to %s", host_session.to_host); buffer[#buffer+1] = data; log("debug", "Buffered item %d: %s", #buffer, data); end end end function s2sout.attempt_connection(host_session, err) local to_host = host_session.to_host; local connect_host, connect_port = to_host and idna_to_ascii(to_host), 5269; local log = host_session.log or log; if not connect_host then return false; end if not err then -- This is our first attempt log("debug", "First attempt to connect to %s, starting with SRV lookup...", to_host); host_session.connecting = true; host_session.resolver:lookup(function (answer) local srv_hosts = { answer = answer }; host_session.srv_hosts = srv_hosts; host_session.srv_choice = 0; host_session.connecting = nil; if answer and #answer > 0 then log("debug", "%s has SRV records, handling...", to_host); for _, record in ipairs(answer) do t_insert(srv_hosts, record.srv); end if #srv_hosts == 1 and srv_hosts[1].target == "." then log("debug", "%s does not provide a XMPP service", to_host); s2s_destroy_session(host_session, err); -- Nothing to see here return; end t_sort(srv_hosts, compare_srv_priorities); local srv_choice = srv_hosts[1]; host_session.srv_choice = 1; if srv_choice then connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port; log("debug", "Best record found, will connect to %s:%d", connect_host, connect_port); end else log("debug", "%s has no SRV records, falling back to A/AAAA", to_host); end -- Try with SRV, or just the plain hostname if no SRV local ok, err = s2sout.try_connect(host_session, connect_host, connect_port); if not ok then if not s2sout.attempt_connection(host_session, err) then -- No more attempts will be made s2s_destroy_session(host_session, err); end end end, "_xmpp-server._tcp."..connect_host..".", "SRV"); return true; -- Attempt in progress elseif host_session.ip_hosts then return s2sout.try_connect(host_session, connect_host, connect_port, err); elseif host_session.srv_hosts and #host_session.srv_hosts > host_session.srv_choice then -- Not our first attempt, and we also have SRV host_session.srv_choice = host_session.srv_choice + 1; local srv_choice = host_session.srv_hosts[host_session.srv_choice]; connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port; host_session.log("info", "Connection failed (%s). Attempt #%d: This time to %s:%d", err, host_session.srv_choice, connect_host, connect_port); else host_session.log("info", "Failed in all attempts to connect to %s", host_session.to_host); -- We're out of options return false; end if not (connect_host and connect_port) then -- Likely we couldn't resolve DNS log("warn", "Hmm, we're without a host (%s) and port (%s) to connect to for %s, giving up :(", connect_host, connect_port, to_host); return false; end return s2sout.try_connect(host_session, connect_host, connect_port); end function s2sout.try_next_ip(host_session) host_session.connecting = nil; host_session.ip_choice = host_session.ip_choice + 1; local ip = host_session.ip_hosts[host_session.ip_choice]; local ok, err= s2sout.make_connect(host_session, ip.ip, ip.port); if not ok then if not s2sout.attempt_connection(host_session, err or "closed") then err = err and (": "..err) or ""; s2s_destroy_session(host_session, "Connection failed"..err); end end end function s2sout.try_connect(host_session, connect_host, connect_port, err) host_session.connecting = true; local log = host_session.log or log; if not err then local IPs = {}; host_session.ip_hosts = IPs; -- luacheck: ignore 231/handle4 231/handle6 local handle4, handle6; local have_other_result = not(has_ipv4) or not(has_ipv6) or false; if has_ipv4 then handle4 = host_session.resolver:lookup(function (reply, err) handle4 = nil; if reply and reply[#reply] and reply[#reply].a then for _, ip in ipairs(reply) do log("debug", "DNS reply for %s gives us %s", connect_host, ip.a); IPs[#IPs+1] = new_ip(ip.a, "IPv4"); end elseif err then log("debug", "Error in DNS lookup: %s", err); end if have_other_result then if #IPs > 0 then rfc6724_dest(host_session.ip_hosts, sources); for i = 1, #IPs do IPs[i] = {ip = IPs[i], port = connect_port}; end host_session.ip_choice = 0; s2sout.try_next_ip(host_session); else log("debug", "DNS lookup failed to get a response for %s", connect_host); host_session.ip_hosts = nil; if not s2sout.attempt_connection(host_session, "name resolution failed") then -- Retry if we can log("debug", "No other records to try for %s - destroying", host_session.to_host); err = err and (": "..err) or ""; s2s_destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't end end else have_other_result = true; end end, connect_host, "A", "IN"); else have_other_result = true; end if has_ipv6 then handle6 = host_session.resolver:lookup(function (reply, err) handle6 = nil; if reply and reply[#reply] and reply[#reply].aaaa then for _, ip in ipairs(reply) do log("debug", "DNS reply for %s gives us %s", connect_host, ip.aaaa); IPs[#IPs+1] = new_ip(ip.aaaa, "IPv6"); end elseif err then log("debug", "Error in DNS lookup: %s", err); end if have_other_result then if #IPs > 0 then rfc6724_dest(host_session.ip_hosts, sources); for i = 1, #IPs do IPs[i] = {ip = IPs[i], port = connect_port}; end host_session.ip_choice = 0; s2sout.try_next_ip(host_session); else log("debug", "DNS lookup failed to get a response for %s", connect_host); host_session.ip_hosts = nil; if not s2sout.attempt_connection(host_session, "name resolution failed") then -- Retry if we can log("debug", "No other records to try for %s - destroying", host_session.to_host); err = err and (": "..err) or ""; s2s_destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't end end else have_other_result = true; end end, connect_host, "AAAA", "IN"); else have_other_result = true; end return true; elseif host_session.ip_hosts and #host_session.ip_hosts > host_session.ip_choice then -- Not our first attempt, and we also have IPs left to try s2sout.try_next_ip(host_session); else log("debug", "Out of IP addresses, trying next SRV record (if any)"); host_session.ip_hosts = nil; if not s2sout.attempt_connection(host_session, "out of IP addresses") then -- Retry if we can log("debug", "No other records to try for %s - destroying", host_session.to_host); err = err and (": "..err) or ""; s2s_destroy_session(host_session, "Connecting failed"..err); -- End of the line, we can't return false; end end return true; end function s2sout.make_connect(host_session, connect_host, connect_port) local log = host_session.log or log; log("debug", "Beginning new connection attempt to %s ([%s]:%d)", host_session.to_host, connect_host.addr, connect_port); -- Reset secure flag in case this is another -- connection attempt after a failed STARTTLS host_session.secure = nil; host_session.encrypted = nil; local conn, handler; local proto = connect_host.proto; if proto == "IPv4" then conn, handler = socket.tcp(); elseif proto == "IPv6" and socket.tcp6 then conn, handler = socket.tcp6(); else handler = "Unsupported protocol: "..tostring(proto); end if not conn then log("warn", "Failed to create outgoing connection, system error: %s", handler); return false, handler; end conn:settimeout(0); local success, err = conn:connect(connect_host.addr, connect_port); if not success and err ~= "timeout" then log("warn", "s2s connect() to %s (%s:%d) failed: %s", host_session.to_host, connect_host.addr, connect_port, err); return false, err; end conn = wrapclient(conn, connect_host.addr, connect_port, s2s_listener, default_mode); host_session.conn = conn; -- Register this outgoing connection so that xmppserver_listener knows about it -- otherwise it will assume it is a new incoming connection s2s_listener.register_outgoing(conn, host_session); log("debug", "Connection attempt in progress..."); return true; end module:hook_global("service-added", function (event) if event.name ~= "s2s" then return end local s2s_sources = portmanager.get_active_services():get("s2s"); if not s2s_sources then module:log("warn", "s2s not listening on any ports, outgoing connections may fail"); return; end for source, _ in pairs(s2s_sources) do if source == "*" or source == "0.0.0.0" then for _, addr in ipairs(local_addresses("ipv4", true)) do sources[#sources + 1] = new_ip(addr, "IPv4"); end elseif source == "::" then for _, addr in ipairs(local_addresses("ipv6", true)) do sources[#sources + 1] = new_ip(addr, "IPv6"); end else sources[#sources + 1] = new_ip(source, (source:find(":") and "IPv6") or "IPv4"); end end for i = 1,#sources do if sources[i].proto == "IPv6" then has_ipv6 = true; elseif sources[i].proto == "IPv4" then has_ipv4 = true; end end if not (has_ipv4 or has_ipv6) then module:log("warn", "No local IPv4 or IPv6 addresses detected, outgoing connections may fail"); end end); return s2sout;