Software /
code /
verse
File
plugins/smacks.lua @ 445:b119dc4d8bc2
plugins.smacks: Don't warn about zero stanzas acked
It's only if the count somehow goes backwards that something is really
wrong. An ack for zero stanzas is fine and we don't need to do anything.
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Thu, 10 Jun 2021 11:58:23 +0200 |
parent | 408:635cbd979d7b |
child | 446:62fb547bd31a |
line wrap: on
line source
local verse = require "verse"; local now = require"socket".gettime; local xmlns_sm = "urn:xmpp:sm:3"; function verse.plugins.smacks(stream) -- State for outgoing stanzas local outgoing_queue = {}; local last_ack = 0; local last_stanza_time = now(); local timer_active; -- State for incoming stanzas local handled_stanza_count = 0; -- Catch incoming stanzas local function incoming_stanza(stanza) if stanza.attr.xmlns == "jabber:client" or not stanza.attr.xmlns then handled_stanza_count = handled_stanza_count + 1; stream:debug("Increasing handled stanzas to %d for %s", handled_stanza_count, stanza:top_tag()); end end -- Catch outgoing stanzas local function outgoing_stanza(stanza) -- NOTE: This will not behave nice if stanzas are serialized before this point if stanza.name and not stanza.attr.xmlns then -- serialize stanzas in order to bypass this on resumption outgoing_queue[#outgoing_queue+1] = tostring(stanza); last_stanza_time = now(); if not timer_active then timer_active = true; stream:debug("Waiting to send ack request..."); verse.add_task(1, function() if #outgoing_queue == 0 then timer_active = false; return; end local time_since_last_stanza = now() - last_stanza_time; if time_since_last_stanza < 1 and #outgoing_queue < 10 then return 1 - time_since_last_stanza; end stream:debug("Time up, sending <r>..."); timer_active = false; stream:send(verse.stanza("r", { xmlns = xmlns_sm })); end); end end end local function on_disconnect() stream:debug("smacks: connection lost"); stream.stream_management_supported = nil; if stream.resumption_token then stream:debug("smacks: have resumption token, reconnecting in 1s..."); stream.authenticated = nil; verse.add_task(1, function () stream:connect(stream.connect_host or stream.host, stream.connect_port or 5222); end); return true; end end -- Graceful shutdown local function on_close() stream.resumption_token = nil; stream:unhook("disconnected", on_disconnect); end local function handle_sm_command(stanza) if stanza.name == "r" then -- Request for acks for stanzas we received stream:debug("Ack requested... acking %d handled stanzas", handled_stanza_count); stream:send(verse.stanza("a", { xmlns = xmlns_sm, h = tostring(handled_stanza_count) })); elseif stanza.name == "a" then -- Ack for stanzas we sent local new_ack = tonumber(stanza.attr.h); if new_ack > last_ack then local old_unacked = #outgoing_queue; for i=last_ack+1,new_ack do table.remove(outgoing_queue, 1); end stream:debug("Received ack: New ack: "..new_ack.." Last ack: "..last_ack.." Unacked stanzas now: "..#outgoing_queue.." (was "..old_unacked..")"); last_ack = new_ack; elseif new_ack < last_ack then stream:warn("Received bad ack for "..new_ack.." when last ack was "..last_ack); end elseif stanza.name == "enabled" then if stanza.attr.id then stream.resumption_token = stanza.attr.id; stream:hook("closed", on_close, 100); stream:hook("disconnected", on_disconnect, 100); end elseif stanza.name == "resumed" then local new_ack = tonumber(stanza.attr.h); if new_ack > last_ack then local old_unacked = #outgoing_queue; for i=last_ack+1,new_ack do table.remove(outgoing_queue, 1); end stream:debug("Received ack: New ack: "..new_ack.." Last ack: "..last_ack.." Unacked stanzas now: "..#outgoing_queue.." (was "..old_unacked..")"); last_ack = new_ack; end for i=1,#outgoing_queue do stream:send(outgoing_queue[i]); end outgoing_queue = {}; stream:debug("Resumed successfully"); stream:event("resumed"); else stream:warn("Don't know how to handle "..xmlns_sm.."/"..stanza.name); end end local function on_bind_success() if not stream.smacks then --stream:unhook("bind-success", on_bind_success); stream:debug("smacks: sending enable"); stream:send(verse.stanza("enable", { xmlns = xmlns_sm, resume = "true" })); stream.smacks = true; -- Catch stanzas stream:hook("stanza", incoming_stanza); stream:hook("outgoing", outgoing_stanza); end end local function on_features(features) if features:get_child("sm", xmlns_sm) then stream.stream_management_supported = true; if stream.smacks and stream.bound then -- Already enabled in a previous session - resume stream:debug("Resuming stream with %d handled stanzas", handled_stanza_count); stream:send(verse.stanza("resume", { xmlns = xmlns_sm, h = handled_stanza_count, previd = stream.resumption_token })); return true; else stream:hook("bind-success", on_bind_success, 1); end end end stream:hook("stream-features", on_features, 250); stream:hook("stream/"..xmlns_sm, handle_sm_command); --stream:hook("ready", on_stream_ready, 500); end