File

plugins/smacks.lua @ 505:289c866d7fb0

verse: Fix to work with server_epoll
author Kim Alvefur <zash@zash.se>
date Sat, 24 Jun 2023 09:48:23 +0200
parent 450:e72deac76e0e
line wrap: on
line source

local verse = require "verse";
local now = require"socket".gettime;

local xmlns_sm = "urn:xmpp:sm:3";

function verse.plugins.smacks(stream)
	-- State for outgoing stanzas
	local outgoing_queue = nil;
	local last_ack = nil;
	local last_stanza_time = nil;
	local timer_active;

	-- State for incoming stanzas
	local handled_stanza_count = nil;

	-- Catch incoming stanzas
	local function incoming_stanza(stanza)
		if handled_stanza_count and (stanza.attr.xmlns == "jabber:client" or not stanza.attr.xmlns) then
			handled_stanza_count = handled_stanza_count + 1;
			stream:debug("Increasing handled stanzas to %d for %s", handled_stanza_count, stanza:top_tag());
		end
	end

	-- Catch outgoing stanzas
	local function outgoing_stanza(stanza)
		-- NOTE: This will not behave nice if stanzas are serialized before this point
		if outgoing_queue and (stanza.name and not stanza.attr.xmlns) then
			-- serialize stanzas in order to bypass this on resumption
			outgoing_queue[#outgoing_queue+1] = tostring(stanza);
			last_stanza_time = now();
			if not timer_active then
				timer_active = true;
				stream:debug("Waiting to send ack request...");
				verse.add_task(1, function()
					if #outgoing_queue == 0 then
						timer_active = false;
						return;
					end
					local time_since_last_stanza = now() - last_stanza_time;
					if time_since_last_stanza < 1 and #outgoing_queue < 10 then
						return 1 - time_since_last_stanza;
					end
					stream:debug("Time up, sending <r>...");
					timer_active = false;
					stream:send(verse.stanza("r", { xmlns = xmlns_sm }));
				end);
			end
		end
	end

	local function on_disconnect()
		stream:debug("smacks: connection lost");
		stream.stream_management_supported = nil;
		if stream.resumption_token then
			stream:debug("smacks: have resumption token, reconnecting in 1s...");
			stream.authenticated = nil;
			verse.add_task(1, function ()
				stream:connect(stream.connect_host or stream.host, stream.connect_port or 5222);
			end);
			return true;
		end
	end

	-- Graceful shutdown
	local function on_close()
		stream.resumption_token = nil;
	end

	local function handle_sm_command(stanza)
		if stanza.name == "r" then -- Request for acks for stanzas we received
			stream:debug("Ack requested... acking %d handled stanzas", handled_stanza_count);
			stream:send(verse.stanza("a", { xmlns = xmlns_sm, h = tostring(handled_stanza_count) }));
		elseif stanza.name == "a" then -- Ack for stanzas we sent
			local new_ack = tonumber(stanza.attr.h);
			if new_ack > last_ack then
				local old_unacked = #outgoing_queue;
				for i=last_ack+1,new_ack do
					table.remove(outgoing_queue, 1);
				end
				stream:debug("Received ack: New ack: "..new_ack.." Last ack: "..last_ack.." Unacked stanzas now: "..#outgoing_queue.." (was "..old_unacked..")");
				last_ack = new_ack;
			elseif new_ack < last_ack then
				stream:warn("Received bad ack for "..new_ack.." when last ack was "..last_ack);
			end
		elseif stanza.name == "enabled" then
			handled_stanza_count = 0;
			stream.pre_smacks_features = nil;

			if stanza.attr.id then
				stream.resumption_token = stanza.attr.id;
			end
		elseif stanza.name == "resumed" then
			stream.pre_smacks_features = nil;
			local new_ack = tonumber(stanza.attr.h);
			if new_ack > last_ack then
				local old_unacked = #outgoing_queue;
				for i=last_ack+1,new_ack do
					table.remove(outgoing_queue, 1);
				end
				stream:debug("Received ack: New ack: "..new_ack.." Last ack: "..last_ack.." Unacked stanzas now: "..#outgoing_queue.." (was "..old_unacked..")");
				last_ack = new_ack;
			end
			for i=1,#outgoing_queue do
				stream:send(outgoing_queue[i]);
			end
			outgoing_queue = {};
			stream:debug("Resumed successfully");
			stream:event("resumed");
		elseif stanza.name == "failed" then
			stream.bound = nil
			stream.smacks = nil
			last_ack = nil
			handled_stanza_count = nil

			-- TODO ack using final h value from <failed/> if present
			outgoing_queue = {}; -- TODO fire some delivery failures

			local features = stream.pre_smacks_features;
			stream.pre_smacks_features = nil;

			-- should trigger a bind and then a new smacks session
			stream:event("stream-features", features);
		else
			stream:warn("Don't know how to handle "..xmlns_sm.."/"..stanza.name);
		end
	end

	local function on_bind_success()
		if stream.stream_management_supported and not stream.smacks then
			--stream:unhook("bind-success", on_bind_success);
			stream:debug("smacks: sending enable");
			outgoing_queue = {};
			last_ack = 0;
			last_stanza_time = now();
			stream:send(verse.stanza("enable", { xmlns = xmlns_sm, resume = "true" }));
			stream.smacks = true;
		end
	end

	local function on_features(features)
		if features:get_child("sm", xmlns_sm) then
			stream.pre_smacks_features = features;
			stream.stream_management_supported = true;
			if stream.smacks and stream.bound then -- Already enabled in a previous session - resume
				stream:debug("Resuming stream with %d handled stanzas", handled_stanza_count);
				stream:send(verse.stanza("resume", { xmlns = xmlns_sm,
					h = tostring(handled_stanza_count), previd = stream.resumption_token }));
				return true;
			else
			end
		end
	end

	stream:hook("stream-features", on_features, 250);
	stream:hook("stream/"..xmlns_sm, handle_sm_command);
	stream:hook("bind-success", on_bind_success, 1);

	-- Catch stanzas
	stream:hook("stanza", incoming_stanza);
	stream:hook("outgoing", outgoing_stanza);

	stream:hook("closed", on_close, 100);
	stream:hook("disconnected", on_disconnect, 100);

	--stream:hook("ready", on_stream_ready, 500);
end