Software /
code /
verse
File
plugins/smacks.lua @ 450:e72deac76e0e
plugins.smacks: Change to track enabled state per direction
Counting outgoing stanzas should start after <enable> is sent, while
counting incoming stanzas should star after receiving <enabled/>
This should also help with failed resumptions
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Sat, 19 Feb 2022 15:57:24 +0100 |
parent | 449:c720f331327c |
line wrap: on
line source
local verse = require "verse"; local now = require"socket".gettime; local xmlns_sm = "urn:xmpp:sm:3"; function verse.plugins.smacks(stream) -- State for outgoing stanzas local outgoing_queue = nil; local last_ack = nil; local last_stanza_time = nil; local timer_active; -- State for incoming stanzas local handled_stanza_count = nil; -- Catch incoming stanzas local function incoming_stanza(stanza) if handled_stanza_count and (stanza.attr.xmlns == "jabber:client" or not stanza.attr.xmlns) then handled_stanza_count = handled_stanza_count + 1; stream:debug("Increasing handled stanzas to %d for %s", handled_stanza_count, stanza:top_tag()); end end -- Catch outgoing stanzas local function outgoing_stanza(stanza) -- NOTE: This will not behave nice if stanzas are serialized before this point if outgoing_queue and (stanza.name and not stanza.attr.xmlns) then -- serialize stanzas in order to bypass this on resumption outgoing_queue[#outgoing_queue+1] = tostring(stanza); last_stanza_time = now(); if not timer_active then timer_active = true; stream:debug("Waiting to send ack request..."); verse.add_task(1, function() if #outgoing_queue == 0 then timer_active = false; return; end local time_since_last_stanza = now() - last_stanza_time; if time_since_last_stanza < 1 and #outgoing_queue < 10 then return 1 - time_since_last_stanza; end stream:debug("Time up, sending <r>..."); timer_active = false; stream:send(verse.stanza("r", { xmlns = xmlns_sm })); end); end end end local function on_disconnect() stream:debug("smacks: connection lost"); stream.stream_management_supported = nil; if stream.resumption_token then stream:debug("smacks: have resumption token, reconnecting in 1s..."); stream.authenticated = nil; verse.add_task(1, function () stream:connect(stream.connect_host or stream.host, stream.connect_port or 5222); end); return true; end end -- Graceful shutdown local function on_close() stream.resumption_token = nil; end local function handle_sm_command(stanza) if stanza.name == "r" then -- Request for acks for stanzas we received stream:debug("Ack requested... acking %d handled stanzas", handled_stanza_count); stream:send(verse.stanza("a", { xmlns = xmlns_sm, h = tostring(handled_stanza_count) })); elseif stanza.name == "a" then -- Ack for stanzas we sent local new_ack = tonumber(stanza.attr.h); if new_ack > last_ack then local old_unacked = #outgoing_queue; for i=last_ack+1,new_ack do table.remove(outgoing_queue, 1); end stream:debug("Received ack: New ack: "..new_ack.." Last ack: "..last_ack.." Unacked stanzas now: "..#outgoing_queue.." (was "..old_unacked..")"); last_ack = new_ack; elseif new_ack < last_ack then stream:warn("Received bad ack for "..new_ack.." when last ack was "..last_ack); end elseif stanza.name == "enabled" then handled_stanza_count = 0; stream.pre_smacks_features = nil; if stanza.attr.id then stream.resumption_token = stanza.attr.id; end elseif stanza.name == "resumed" then stream.pre_smacks_features = nil; local new_ack = tonumber(stanza.attr.h); if new_ack > last_ack then local old_unacked = #outgoing_queue; for i=last_ack+1,new_ack do table.remove(outgoing_queue, 1); end stream:debug("Received ack: New ack: "..new_ack.." Last ack: "..last_ack.." Unacked stanzas now: "..#outgoing_queue.." (was "..old_unacked..")"); last_ack = new_ack; end for i=1,#outgoing_queue do stream:send(outgoing_queue[i]); end outgoing_queue = {}; stream:debug("Resumed successfully"); stream:event("resumed"); elseif stanza.name == "failed" then stream.bound = nil stream.smacks = nil last_ack = nil handled_stanza_count = nil -- TODO ack using final h value from <failed/> if present outgoing_queue = {}; -- TODO fire some delivery failures local features = stream.pre_smacks_features; stream.pre_smacks_features = nil; -- should trigger a bind and then a new smacks session stream:event("stream-features", features); else stream:warn("Don't know how to handle "..xmlns_sm.."/"..stanza.name); end end local function on_bind_success() if stream.stream_management_supported and not stream.smacks then --stream:unhook("bind-success", on_bind_success); stream:debug("smacks: sending enable"); outgoing_queue = {}; last_ack = 0; last_stanza_time = now(); stream:send(verse.stanza("enable", { xmlns = xmlns_sm, resume = "true" })); stream.smacks = true; end end local function on_features(features) if features:get_child("sm", xmlns_sm) then stream.pre_smacks_features = features; stream.stream_management_supported = true; if stream.smacks and stream.bound then -- Already enabled in a previous session - resume stream:debug("Resuming stream with %d handled stanzas", handled_stanza_count); stream:send(verse.stanza("resume", { xmlns = xmlns_sm, h = tostring(handled_stanza_count), previd = stream.resumption_token })); return true; else end end end stream:hook("stream-features", on_features, 250); stream:hook("stream/"..xmlns_sm, handle_sm_command); stream:hook("bind-success", on_bind_success, 1); -- Catch stanzas stream:hook("stanza", incoming_stanza); stream:hook("outgoing", outgoing_stanza); stream:hook("closed", on_close, 100); stream:hook("disconnected", on_disconnect, 100); --stream:hook("ready", on_stream_ready, 500); end