Comparison

plugins/smacks.lua @ 321:369d4638d775

plugins.smacks: Re-send unacked outgoing stanzas on resumption
author Kim Alvefur <zash@zash.se>
date Sun, 10 Feb 2013 01:54:30 +0100
parent 320:e04f10664704
child 324:dbb3362c1ff3
comparison
equal deleted inserted replaced
320:e04f10664704 321:369d4638d775
15 if stanza.attr.xmlns == "jabber:client" or not stanza.attr.xmlns then 15 if stanza.attr.xmlns == "jabber:client" or not stanza.attr.xmlns then
16 handled_stanza_count = handled_stanza_count + 1; 16 handled_stanza_count = handled_stanza_count + 1;
17 stream:debug("Increasing handled stanzas to %d for %s", handled_stanza_count, stanza:top_tag()); 17 stream:debug("Increasing handled stanzas to %d for %s", handled_stanza_count, stanza:top_tag());
18 end 18 end
19 end 19 end
20 20
21 -- Catch outgoing stanzas
22 function outgoing_stanza(stanza)
23 -- NOTE: This will not behave nice if stanzas are serialized before this point
24 if stanza.name and not stanza.attr.xmlns then
25 -- serialize stanzas in order to bypass this on resumption
26 outgoing_queue[#outgoing_queue+1] = tostring(stanza);
27 verse.add_task(1, function()
28 if #outgoing_queue > 0 then
29 stream:send(verse.stanza("r", { xmlns = xmlns_sm }));
30 end
31 end);
32 end
33 end
34
21 local function on_disconnect() 35 local function on_disconnect()
22 stream:debug("smacks: connection lost"); 36 stream:debug("smacks: connection lost");
23 stream.stream_management_supported = nil; 37 stream.stream_management_supported = nil;
24 if stream.resumption_token then 38 if stream.resumption_token then
25 stream:debug("smacks: have resumption token, reconnecting in 1s..."); 39 stream:debug("smacks: have resumption token, reconnecting in 1s...");
47 else 61 else
48 stream:warn("Received bad ack for "..new_ack.." when last ack was "..last_ack); 62 stream:warn("Received bad ack for "..new_ack.." when last ack was "..last_ack);
49 end 63 end
50 elseif stanza.name == "enabled" then 64 elseif stanza.name == "enabled" then
51 stream.smacks = true; 65 stream.smacks = true;
52 -- Catch outgoing stanzas 66
53 local old_send = stream.send; 67 -- Catch stanzas
54 function stream.send(stream, stanza)
55 stream:warn("SENDING");
56 if stanza.name and not stanza.attr.xmlns then
57 outgoing_queue[#outgoing_queue+1] = stanza;
58 local ret = old_send(stream, stanza);
59 old_send(stream, verse.stanza("r", { xmlns = xmlns_sm }));
60 return ret;
61 end
62 return old_send(stream, stanza);
63 end
64 -- Catch incoming stanzas
65 stream:hook("stanza", incoming_stanza); 68 stream:hook("stanza", incoming_stanza);
69 stream:hook("outgoing", outgoing_stanza);
66 70
67 if stanza.attr.id then 71 if stanza.attr.id then
68 stream.resumption_token = stanza.attr.id; 72 stream.resumption_token = stanza.attr.id;
69 stream:hook("disconnected", on_disconnect, 100); 73 stream:hook("disconnected", on_disconnect, 100);
70 end 74 end
71 elseif stanza.name == "resumed" then 75 elseif stanza.name == "resumed" then
72 --TODO: Check h of the resumed element, discard any acked stanzas from 76 local new_ack = tonumber(stanza.attr.h);
73 -- our queue (to prevent duplicates), then re-send any lost stanzas. 77 if new_ack > last_ack then
78 local old_unacked = #outgoing_queue;
79 for i=last_ack+1,new_ack do
80 table.remove(outgoing_queue, 1);
81 end
82 stream:debug("Received ack: New ack: "..new_ack.." Last ack: "..last_ack.." Unacked stanzas now: "..#outgoing_queue.." (was "..old_unacked..")");
83 last_ack = new_ack;
84 end
85 for i=1,#outgoing_queue do
86 stream:send(outgoing_queue[i]);
87 end
88 outgoing_queue = {};
74 stream:debug("Resumed successfully"); 89 stream:debug("Resumed successfully");
75 stream:event("resumed"); 90 stream:event("resumed");
76 else 91 else
77 stream:warn("Don't know how to handle "..xmlns_sm.."/"..stanza.name); 92 stream:warn("Don't know how to handle "..xmlns_sm.."/"..stanza.name);
78 end 93 end