Software /
code /
prosody
File
plugins/mod_csi_simple.lua @ 10303:c434bff22b14
mod_csi_simple: Always remove session filters when disabling CSI
Only guard the actual pausing of outgoing data on the method existing.
This prevents the filters from lingering in case something happened to
the connection. Removing already removed filters should be a safe noop.
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Sat, 05 Oct 2019 16:55:58 +0200 |
parent | 10302:4c53e58d2468 |
child | 10414:51ebfdeccad7 |
line wrap: on
line source
-- Copyright (C) 2016-2018 Kim Alvefur -- -- This project is MIT/X11 licensed. Please see the -- COPYING file in the source package for more information. -- module:depends"csi" local jid = require "util.jid"; local st = require "util.stanza"; local dt = require "util.datetime"; local filters = require "util.filters"; local queue_size = module:get_option_number("csi_queue_size", 256); module:hook("csi-is-stanza-important", function (event) local stanza = event.stanza; if not st.is_stanza(stanza) then return true; end local st_name = stanza.name; if not st_name then return false; end local st_type = stanza.attr.type; if st_name == "presence" then if st_type == nil or st_type == "unavailable" then return false; end return true; elseif st_name == "message" then if st_type == "headline" then return false; end if stanza:get_child("sent", "urn:xmpp:carbons:2") then return true; end local forwarded = stanza:find("{urn:xmpp:carbons:2}received/{urn:xmpp:forward:0}/{jabber:client}message"); if forwarded then stanza = forwarded; end if stanza:get_child("body") then return true; end if stanza:get_child("subject") then return true; end if stanza:get_child("encryption", "urn:xmpp:eme:0") then return true; end return false; end return true; end, -1); local function with_timestamp(stanza, from) if st.is_stanza(stanza) and stanza.attr.xmlns == nil and stanza.name ~= "iq" then stanza = st.clone(stanza); stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = from, stamp = dt.datetime()})); end return stanza; end local function manage_buffer(stanza, session) local ctr = session.csi_counter or 0; if ctr >= queue_size then session.log("debug", "Queue size limit hit, flushing buffer (queue size is %d)", session.csi_counter); session.conn:resume_writes(); elseif module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then session.log("debug", "Important stanza, flushing buffer (queue size is %d)", session.csi_counter); session.conn:resume_writes(); else stanza = with_timestamp(stanza, jid.join(session.username, session.host)) end session.csi_counter = ctr + 1; return stanza; end local function flush_buffer(data, session) if session.csi_flushing then return data; end session.csi_flushing = true; session.log("debug", "Client sent something, flushing buffer once (queue size is %d)", session.csi_counter); session.conn:resume_writes(); return data; end function enable_optimizations(session) if session.conn and session.conn.pause_writes then session.conn:pause_writes(); filters.add_filter(session, "stanzas/out", manage_buffer); filters.add_filter(session, "bytes/in", flush_buffer); else session.log("warn", "Session connection does not support write pausing"); end end function disable_optimizations(session) session.csi_flushing = nil; filters.remove_filter(session, "stanzas/out", manage_buffer); filters.remove_filter(session, "bytes/in", flush_buffer); if session.conn and session.conn.resume_writes then session.conn:resume_writes(); end end module:hook("csi-client-inactive", function (event) local session = event.origin; enable_optimizations(session); end); module:hook("csi-client-active", function (event) local session = event.origin; disable_optimizations(session); end); module:hook("pre-resource-unbind", function (event) local session = event.session; disable_optimizations(session); end); module:hook("c2s-ondrain", function (event) local session = event.session; if session.state == "inactive" and session.conn and session.conn.pause_writes then session.conn:pause_writes(); session.log("debug", "Buffer flushed, resuming inactive mode (queue size was %d)", session.csi_counter); session.csi_counter = 0; end end); function module.load() for _, user_session in pairs(prosody.hosts[module.host].sessions) do for _, session in pairs(user_session.sessions) do if session.state == "inactive" then enable_optimizations(session); end end end end function module.unload() for _, user_session in pairs(prosody.hosts[module.host].sessions) do for _, session in pairs(user_session.sessions) do if session.state == "inactive" then disable_optimizations(session); end end end end