Software /
code /
prosody-modules
Comparison
mod_smacks/mod_smacks.lua @ 2624:c110b6bfe5d1
mod_smacks: Prevent ack loop on misbehaving clients
author | tmolitor <thilo@eightysoft.de> |
---|---|
date | Wed, 15 Mar 2017 16:24:03 +0100 |
parent | 2623:a65260300708 |
child | 2670:6e01878103c0 |
comparison
equal
deleted
inserted
replaced
2623:a65260300708 | 2624:c110b6bfe5d1 |
---|---|
136 event.features:tag("sm", sm2_attr):tag("optional"):up():up(); | 136 event.features:tag("sm", sm2_attr):tag("optional"):up():up(); |
137 event.features:tag("sm", sm3_attr):tag("optional"):up():up(); | 137 event.features:tag("sm", sm3_attr):tag("optional"):up():up(); |
138 end | 138 end |
139 end); | 139 end); |
140 | 140 |
141 local function request_ack_if_needed(session) | 141 local function request_ack_if_needed(session, force) |
142 local queue = session.outgoing_stanza_queue; | 142 local queue = session.outgoing_stanza_queue; |
143 if #queue > max_unacked_stanzas and session.awaiting_ack == nil then | 143 if session.awaiting_ack == nil then |
144 session.log("debug", "Queuing <r> (in a moment)"); | 144 if (#queue > max_unacked_stanzas and session.last_queue_count ~= #queue) or force then |
145 session.awaiting_ack = false; | 145 session.log("debug", "Queuing <r> (in a moment)"); |
146 session.awaiting_ack_timer = stoppable_timer(1e-06, function () | 146 session.awaiting_ack = false; |
147 if not session.awaiting_ack then | 147 session.awaiting_ack_timer = stoppable_timer(1e-06, function () |
148 session.log("debug", "Sending <r> (inside timer, before send)"); | 148 if not session.awaiting_ack then |
149 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) | 149 session.log("debug", "Sending <r> (inside timer, before send)"); |
150 session.log("debug", "Sending <r> (inside timer, after send)"); | 150 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) |
151 session.awaiting_ack = true; | 151 session.log("debug", "Sending <r> (inside timer, after send)"); |
152 if not session.delayed_ack_timer then | 152 session.awaiting_ack = true; |
153 session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() | 153 if not session.delayed_ack_timer then |
154 delayed_ack_function(session); | 154 session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() |
155 end); | 155 delayed_ack_function(session); |
156 end); | |
157 end | |
156 end | 158 end |
157 end | 159 end); |
158 end); | 160 end |
159 end | 161 -- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue |
160 -- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue | 162 -- and there isn't already a timer for this event running. |
161 -- and there isn't already a timer for this event running. | 163 -- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event |
162 -- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event | 164 -- would not trigger this event (again). |
163 -- would not trigger this event (again). | 165 if #queue > max_unacked_stanzas and session.awaiting_ack and session.delayed_ack_timer == nil then |
164 if #queue > max_unacked_stanzas and session.awaiting_ack and session.delayed_ack_timer == nil then | 166 session.log("debug", "Calling delayed_ack_function directly (still waiting for ack)"); |
165 session.log("debug", "Calling delayed_ack_function directly (still waiting for ack)"); | 167 delayed_ack_function(session); |
166 delayed_ack_function(session); | 168 end |
167 end | 169 end |
170 session.last_queue_count = #queue; | |
168 end | 171 end |
169 | 172 |
170 local function outgoing_stanza_filter(stanza, session) | 173 local function outgoing_stanza_filter(stanza, session) |
171 local is_stanza = stanza.attr and not stanza.attr.xmlns and not stanza.name:find":"; | 174 local is_stanza = stanza.attr and not stanza.attr.xmlns and not stanza.name:find":"; |
172 if is_stanza and not stanza._cached then -- Stanza in default stream namespace | 175 if is_stanza and not stanza._cached then -- Stanza in default stream namespace |
182 session.log("debug", "#queue = %d", #queue); | 185 session.log("debug", "#queue = %d", #queue); |
183 if session.hibernating then | 186 if session.hibernating then |
184 session.log("debug", "hibernating, stanza queued"); | 187 session.log("debug", "hibernating, stanza queued"); |
185 return nil; | 188 return nil; |
186 end | 189 end |
187 request_ack_if_needed(session); | 190 request_ack_if_needed(session, false); |
188 end | 191 end |
189 return stanza; | 192 return stanza; |
190 end | 193 end |
191 | 194 |
192 local function count_incoming_stanzas(stanza, session) | 195 local function count_incoming_stanzas(stanza, session) |
335 for i=1,math_min(handled_stanza_count,#queue) do | 338 for i=1,math_min(handled_stanza_count,#queue) do |
336 t_remove(origin.outgoing_stanza_queue, 1); | 339 t_remove(origin.outgoing_stanza_queue, 1); |
337 end | 340 end |
338 origin.log("debug", "#queue = %d", #queue); | 341 origin.log("debug", "#queue = %d", #queue); |
339 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count; | 342 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count; |
340 request_ack_if_needed(origin) | 343 request_ack_if_needed(origin, false) |
341 return true; | 344 return true; |
342 end | 345 end |
343 module:hook_stanza(xmlns_sm2, "a", handle_a); | 346 module:hook_stanza(xmlns_sm2, "a", handle_a); |
344 module:hook_stanza(xmlns_sm3, "a", handle_a); | 347 module:hook_stanza(xmlns_sm3, "a", handle_a); |
345 | 348 |
500 original_session.log("debug", "#queue = %d -- after send", #queue); | 503 original_session.log("debug", "#queue = %d -- after send", #queue); |
501 function session.send(stanza) | 504 function session.send(stanza) |
502 session.log("warn", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); | 505 session.log("warn", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); |
503 return false; | 506 return false; |
504 end | 507 end |
508 request_ack_if_needed(original_session, true); | |
505 else | 509 else |
506 module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]", | 510 module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]", |
507 session.username or "?", session.host or "?", session.type, | 511 session.username or "?", session.host or "?", session.type, |
508 original_session.username or "?", original_session.host or "?", original_session.type); | 512 original_session.username or "?", original_session.host or "?", original_session.type); |
509 session.send(st.stanza("failed", { xmlns = xmlns_sm }) | 513 session.send(st.stanza("failed", { xmlns = xmlns_sm }) |