# HG changeset patch # User Kim Alvefur # Date 1413113090 -7200 # Node ID 2881d532f385a02d4c0fd2051b49ac2ba5f06736 # Parent 67c80abe742ed8d88389fd1e03263f9f4c2c5143 mod_smacks: Use filters for queuing outgoing stanzas instead of wrapping session.send() diff -r 67c80abe742e -r 2881d532f385 mod_smacks/mod_smacks.lua --- a/mod_smacks/mod_smacks.lua Thu Oct 09 15:08:05 2014 +0200 +++ b/mod_smacks/mod_smacks.lua Sun Oct 12 13:24:50 2014 +0200 @@ -68,8 +68,47 @@ end end); -local function wrap_session(session, resume, xmlns_sm) - local sm_attr = { xmlns = xmlns_sm }; +local function outgoing_stanza_filter(stanza, session) + local is_stanza = stanza.attr and not stanza.attr.xmlns; + if is_stanza and not stanza._cached then -- Stanza in default stream namespace + local queue = session.outgoing_stanza_queue; + module:log("debug", "st.clone( %s ) -- %s a stanza", tostring(stanza), is_stanza and "is" or "is not"); + local cached_stanza = st.clone(stanza); + cached_stanza._cached = true; + + if cached_stanza and cached_stanza:get_child("delay", xmlns_delay) == nil then + cached_stanza = cached_stanza:tag("delay", { xmlns = xmlns_delay, from = session.host, stamp = datetime.datetime()}); + end + + queue[#queue+1] = cached_stanza; + session.log("debug", "#queue = %d", #queue); + if #queue > max_unacked_stanzas then + module:add_timer(0, function () + if not session.awaiting_ack then + session.awaiting_ack = true; + session.send(st.stanza("r", { xmlns = session.smacks })); + end + end); + end + end + if session.hibernating then + session.log("debug", "hibernating, stanza queued") + -- The session is hibernating, no point in sending the stanza + -- over a dead connection. It will be delivered upon resumption. + return nil; -- or empty string? + end + return stanza; +end + +local function count_incoming_stanzas(stanza, session) + if not stanza.attr.xmlns then + session.handled_stanza_count = session.handled_stanza_count + 1; + session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count); + end + return stanza; +end + +local function wrap_session(session, resume) -- Overwrite process_stanza() and send() local queue; if not resume then @@ -80,39 +119,7 @@ queue = session.outgoing_stanza_queue; end - local _send = session.sends2s or session.send; - local function new_send(stanza) - local is_stanza = stanza.attr and not stanza.attr.xmlns; - if is_stanza then -- Stanza in default stream namespace - module:log("debug", "st.clone( %s ) -- %s a stanza", tostring(stanza), is_stanza and "is" or "is not"); - local cached_stanza = st.clone(stanza); - - if cached_stanza and cached_stanza:get_child("delay", xmlns_delay) == nil then - cached_stanza = cached_stanza:tag("delay", { xmlns = xmlns_delay, from = session.host, stamp = datetime.datetime()}); - end - - queue[#queue+1] = cached_stanza; - session.log("debug", "#queue = %d", #queue); - end - if session.hibernating then - session.log("debug", "hibernating, stanza queued") - -- The session is hibernating, no point in sending the stanza - -- over a dead connection. It will be delivered upon resumption. - return true; - end - local ok, err = _send(stanza); - if ok and #queue > max_unacked_stanzas and not session.awaiting_ack and is_stanza then - session.awaiting_ack = true; - return _send(st.stanza("r", sm_attr)); - end - return ok, err; - end - - if session.sends2s then - session.sends2s = new_send; - else - session.send = new_send; - end + add_filter(session, "stanzas/out", outgoing_stanza_filter, -1000); local session_close = session.close; function session.close(...) @@ -125,13 +132,7 @@ if not resume then session.handled_stanza_count = 0; - add_filter(session, "stanzas/in", function (stanza) - if not stanza.attr.xmlns then - session.handled_stanza_count = session.handled_stanza_count + 1; - session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count); - end - return stanza; - end); + add_filter(session, "stanzas/in", count_incoming_stanzas, 1000); end return session; @@ -146,9 +147,9 @@ end module:log("debug", "Enabling stream management"); - session.smacks = true; + session.smacks = xmlns_sm; - wrap_session(session, false, xmlns_sm); + wrap_session(session, false); local resume_token; local resume = stanza.attr.resume; @@ -165,9 +166,9 @@ function handle_enabled(session, stanza, xmlns_sm) module:log("debug", "Enabling stream management"); - session.smacks = true; + session.smacks = xmlns_sm; - wrap_session(session, false, xmlns_sm); + wrap_session(session, false); -- FIXME Resume? @@ -310,24 +311,13 @@ original_session.ip = session.ip; original_session.conn = session.conn; original_session.send = session.send; + original_session.send.filter = original_session.filter; original_session.stream = session.stream; original_session.secure = session.secure; original_session.hibernating = nil; - local filter = original_session.filter; - local stream = session.stream; - local log = session.log; - function original_session.data(data) - data = filter("bytes/in", data); - if data then - local ok, err = stream:feed(data); - if ok then return; end - log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); - original_session:close("xml-not-well-formed"); - end - end - wrap_session(original_session, true, xmlns_sm); + wrap_session(original_session, true); -- Inform xmppstream of the new session (passed to its callbacks) - stream:set_session(original_session); + original_session.stream:set_session(original_session); -- Similar for connlisteners c2s_sessions[session.conn] = original_session;