Comparison

mod_smacks/mod_smacks.lua @ 2624:c110b6bfe5d1

mod_smacks: Prevent ack loop on misbehaving clients
author tmolitor <thilo@eightysoft.de>
date Wed, 15 Mar 2017 16:24:03 +0100
parent 2623:a65260300708
child 2670:6e01878103c0
comparison
equal deleted inserted replaced
2623:a65260300708 2624:c110b6bfe5d1
136 event.features:tag("sm", sm2_attr):tag("optional"):up():up(); 136 event.features:tag("sm", sm2_attr):tag("optional"):up():up();
137 event.features:tag("sm", sm3_attr):tag("optional"):up():up(); 137 event.features:tag("sm", sm3_attr):tag("optional"):up():up();
138 end 138 end
139 end); 139 end);
140 140
141 local function request_ack_if_needed(session) 141 local function request_ack_if_needed(session, force)
142 local queue = session.outgoing_stanza_queue; 142 local queue = session.outgoing_stanza_queue;
143 if #queue > max_unacked_stanzas and session.awaiting_ack == nil then 143 if session.awaiting_ack == nil then
144 session.log("debug", "Queuing <r> (in a moment)"); 144 if (#queue > max_unacked_stanzas and session.last_queue_count ~= #queue) or force then
145 session.awaiting_ack = false; 145 session.log("debug", "Queuing <r> (in a moment)");
146 session.awaiting_ack_timer = stoppable_timer(1e-06, function () 146 session.awaiting_ack = false;
147 if not session.awaiting_ack then 147 session.awaiting_ack_timer = stoppable_timer(1e-06, function ()
148 session.log("debug", "Sending <r> (inside timer, before send)"); 148 if not session.awaiting_ack then
149 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) 149 session.log("debug", "Sending <r> (inside timer, before send)");
150 session.log("debug", "Sending <r> (inside timer, after send)"); 150 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
151 session.awaiting_ack = true; 151 session.log("debug", "Sending <r> (inside timer, after send)");
152 if not session.delayed_ack_timer then 152 session.awaiting_ack = true;
153 session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() 153 if not session.delayed_ack_timer then
154 delayed_ack_function(session); 154 session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function()
155 end); 155 delayed_ack_function(session);
156 end);
157 end
156 end 158 end
157 end 159 end);
158 end); 160 end
159 end 161 -- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue
160 -- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue 162 -- and there isn't already a timer for this event running.
161 -- and there isn't already a timer for this event running. 163 -- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event
162 -- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event 164 -- would not trigger this event (again).
163 -- would not trigger this event (again). 165 if #queue > max_unacked_stanzas and session.awaiting_ack and session.delayed_ack_timer == nil then
164 if #queue > max_unacked_stanzas and session.awaiting_ack and session.delayed_ack_timer == nil then 166 session.log("debug", "Calling delayed_ack_function directly (still waiting for ack)");
165 session.log("debug", "Calling delayed_ack_function directly (still waiting for ack)"); 167 delayed_ack_function(session);
166 delayed_ack_function(session); 168 end
167 end 169 end
170 session.last_queue_count = #queue;
168 end 171 end
169 172
170 local function outgoing_stanza_filter(stanza, session) 173 local function outgoing_stanza_filter(stanza, session)
171 local is_stanza = stanza.attr and not stanza.attr.xmlns and not stanza.name:find":"; 174 local is_stanza = stanza.attr and not stanza.attr.xmlns and not stanza.name:find":";
172 if is_stanza and not stanza._cached then -- Stanza in default stream namespace 175 if is_stanza and not stanza._cached then -- Stanza in default stream namespace
182 session.log("debug", "#queue = %d", #queue); 185 session.log("debug", "#queue = %d", #queue);
183 if session.hibernating then 186 if session.hibernating then
184 session.log("debug", "hibernating, stanza queued"); 187 session.log("debug", "hibernating, stanza queued");
185 return nil; 188 return nil;
186 end 189 end
187 request_ack_if_needed(session); 190 request_ack_if_needed(session, false);
188 end 191 end
189 return stanza; 192 return stanza;
190 end 193 end
191 194
192 local function count_incoming_stanzas(stanza, session) 195 local function count_incoming_stanzas(stanza, session)
335 for i=1,math_min(handled_stanza_count,#queue) do 338 for i=1,math_min(handled_stanza_count,#queue) do
336 t_remove(origin.outgoing_stanza_queue, 1); 339 t_remove(origin.outgoing_stanza_queue, 1);
337 end 340 end
338 origin.log("debug", "#queue = %d", #queue); 341 origin.log("debug", "#queue = %d", #queue);
339 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count; 342 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count;
340 request_ack_if_needed(origin) 343 request_ack_if_needed(origin, false)
341 return true; 344 return true;
342 end 345 end
343 module:hook_stanza(xmlns_sm2, "a", handle_a); 346 module:hook_stanza(xmlns_sm2, "a", handle_a);
344 module:hook_stanza(xmlns_sm3, "a", handle_a); 347 module:hook_stanza(xmlns_sm3, "a", handle_a);
345 348
500 original_session.log("debug", "#queue = %d -- after send", #queue); 503 original_session.log("debug", "#queue = %d -- after send", #queue);
501 function session.send(stanza) 504 function session.send(stanza)
502 session.log("warn", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); 505 session.log("warn", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza));
503 return false; 506 return false;
504 end 507 end
508 request_ack_if_needed(original_session, true);
505 else 509 else
506 module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]", 510 module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]",
507 session.username or "?", session.host or "?", session.type, 511 session.username or "?", session.host or "?", session.type,
508 original_session.username or "?", original_session.host or "?", original_session.type); 512 original_session.username or "?", original_session.host or "?", original_session.type);
509 session.send(st.stanza("failed", { xmlns = xmlns_sm }) 513 session.send(st.stanza("failed", { xmlns = xmlns_sm })