Comparison

mod_smacks/mod_smacks.lua @ 2491:5fbca7de2088

mod_smacks: Send out more ack requests where needed Under some circumstances it was possible that more than "max_unacked_stanzas" where left in the outgoing stanza queue without forcing an ack. This could happen, when more stanzas entered the queue while the last ack request was still unanswered. Now the test "#queue > max_unacked_stanzas" is done upon receiving an ack as well as when sending out stanzas, which fixes this bug.
author tmolitor <thilo@eightysoft.de>
date Sun, 12 Feb 2017 19:27:50 +0100
parent 2417:5e7badecf7fe
child 2494:d300ae5dba87
comparison
equal deleted inserted replaced
2490:85509650ba82 2491:5fbca7de2088
3 -- Copyright (C) 2010-2015 Matthew Wild 3 -- Copyright (C) 2010-2015 Matthew Wild
4 -- Copyright (C) 2010 Waqas Hussain 4 -- Copyright (C) 2010 Waqas Hussain
5 -- Copyright (C) 2012-2015 Kim Alvefur 5 -- Copyright (C) 2012-2015 Kim Alvefur
6 -- Copyright (C) 2012 Thijs Alkemade 6 -- Copyright (C) 2012 Thijs Alkemade
7 -- Copyright (C) 2014 Florian Zeitz 7 -- Copyright (C) 2014 Florian Zeitz
8 -- Copyright (C) 2016 Thilo Molitor 8 -- Copyright (C) 2016-2017 Thilo Molitor
9 -- 9 --
10 -- This project is MIT/X11 licensed. Please see the 10 -- This project is MIT/X11 licensed. Please see the
11 -- COPYING file in the source package for more information. 11 -- COPYING file in the source package for more information.
12 -- 12 --
13 13
78 event.features:tag("sm", sm2_attr):tag("optional"):up():up(); 78 event.features:tag("sm", sm2_attr):tag("optional"):up():up();
79 event.features:tag("sm", sm3_attr):tag("optional"):up():up(); 79 event.features:tag("sm", sm3_attr):tag("optional"):up():up();
80 end 80 end
81 end); 81 end);
82 82
83 local function request_ack_if_needed(session)
84 local queue = session.outgoing_stanza_queue;
85 if #queue > max_unacked_stanzas and session.awaiting_ack == nil then
86 session.log("debug", "Queuing <r> (in a moment)");
87 session.awaiting_ack = false;
88 session.awaiting_ack_timer = module:add_timer(1e-06, function ()
89 if not session.awaiting_ack then
90 session.log("debug", "Sending <r> (inside timer, before send)");
91 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
92 session.log("debug", "Sending <r> (inside timer, after send)");
93 session.awaiting_ack = true;
94 session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function()
95 delayed_ack_function(session);
96 end);
97 end
98 end);
99 end
100 end
101
83 local function outgoing_stanza_filter(stanza, session) 102 local function outgoing_stanza_filter(stanza, session)
84 local is_stanza = stanza.attr and not stanza.attr.xmlns and not stanza.name:find":"; 103 local is_stanza = stanza.attr and not stanza.attr.xmlns and not stanza.name:find":";
85 if is_stanza and not stanza._cached then -- Stanza in default stream namespace 104 if is_stanza and not stanza._cached then -- Stanza in default stream namespace
86 local queue = session.outgoing_stanza_queue; 105 local queue = session.outgoing_stanza_queue;
87 local cached_stanza = st.clone(stanza); 106 local cached_stanza = st.clone(stanza);
95 session.log("debug", "#queue = %d", #queue); 114 session.log("debug", "#queue = %d", #queue);
96 if session.hibernating then 115 if session.hibernating then
97 session.log("debug", "hibernating, stanza queued"); 116 session.log("debug", "hibernating, stanza queued");
98 return nil; 117 return nil;
99 end 118 end
100 if #queue > max_unacked_stanzas and session.awaiting_ack == nil then 119 request_ack_if_needed(session);
101 session.log("debug", "Queuing <r> (in a moment)");
102 session.awaiting_ack = false;
103 session.awaiting_ack_timer = module:add_timer(1e-06, function ()
104 if not session.awaiting_ack then
105 session.log("debug", "Sending <r> (before send)");
106 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
107 session.log("debug", "Sending <r> (after send)");
108 session.awaiting_ack = true;
109 session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function()
110 delayed_ack_function(session);
111 end);
112 end
113 end);
114 end
115 end 120 end
116 return stanza; 121 return stanza;
117 end 122 end
118 123
119 local function count_incoming_stanzas(stanza, session) 124 local function count_incoming_stanzas(stanza, session)
236 end 241 end
237 if origin.delayed_ack_timer then 242 if origin.delayed_ack_timer then
238 origin.delayed_ack_timer:stop(); 243 origin.delayed_ack_timer:stop();
239 end 244 end
240 -- Remove handled stanzas from outgoing_stanza_queue 245 -- Remove handled stanzas from outgoing_stanza_queue
241 --log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or ""); 246 log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or "");
242 local h = tonumber(stanza.attr.h); 247 local h = tonumber(stanza.attr.h);
243 if not h then 248 if not h then
244 origin:close{ condition = "invalid-xml"; text = "Missing or invalid 'h' attribute"; }; 249 origin:close{ condition = "invalid-xml"; text = "Missing or invalid 'h' attribute"; };
245 end 250 end
246 local handled_stanza_count = h-origin.last_acknowledged_stanza; 251 local handled_stanza_count = h-origin.last_acknowledged_stanza;
256 for i=1,math_min(handled_stanza_count,#queue) do 261 for i=1,math_min(handled_stanza_count,#queue) do
257 t_remove(origin.outgoing_stanza_queue, 1); 262 t_remove(origin.outgoing_stanza_queue, 1);
258 end 263 end
259 origin.log("debug", "#queue = %d", #queue); 264 origin.log("debug", "#queue = %d", #queue);
260 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count; 265 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count;
266 request_ack_if_needed(origin)
261 return true; 267 return true;
262 end 268 end
263 module:hook_stanza(xmlns_sm2, "a", handle_a); 269 module:hook_stanza(xmlns_sm2, "a", handle_a);
264 module:hook_stanza(xmlns_sm3, "a", handle_a); 270 module:hook_stanza(xmlns_sm3, "a", handle_a);
265 271