Software /
code /
prosody-modules
Comparison
mod_smacks/mod_smacks.lua @ 160:9a7671720dec
mod_smacks: XEP-0198 Stream Management acks. Initial commit - very rough, useful mainly for testing at the moment, most certainly contains bugs :)
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Thu, 03 Jun 2010 01:08:58 +0100 |
child | 200:64a573203c20 |
comparison
equal
deleted
inserted
replaced
159:9a37898f4f7c | 160:9a7671720dec |
---|---|
1 local st = require "util.stanza"; | |
2 | |
3 local t_insert, t_remove = table.insert, table.remove; | |
4 local tonumber, tostring = tonumber, tostring; | |
5 | |
6 local xmlns_sm = "urn:xmpp:sm:2"; | |
7 | |
8 local sm_attr = { xmlns = xmlns_sm }; | |
9 | |
10 module:add_event_hook("stream-features", | |
11 function (session, features) | |
12 features:tag("sm", sm_attr):tag("optional"):up():up(); | |
13 end); | |
14 | |
15 module:hook("s2s-stream-features", | |
16 function (data) | |
17 data.features:tag("sm", sm_attr):tag("optional"):up():up(); | |
18 end); | |
19 | |
20 module:hook_stanza(xmlns_sm, "enable", | |
21 function (session, stanza) | |
22 module:log("debug", "Enabling stream management"); | |
23 session.smacks = true; | |
24 session.handled_stanza_count = 0; | |
25 -- Overwrite process_stanza() and send() | |
26 local queue, queue_length = {}, 0; | |
27 session.outgoing_stanza_queue, session.outgoing_stanza_count = queue, queue_length; | |
28 local _send = session.send; | |
29 function session.send(stanza) | |
30 local attr = stanza.attr; | |
31 if attr and not attr.xmlns then -- Stanza in default stream namespace | |
32 queue_length = queue_length + 1; | |
33 session.outgoing_stanza_count = queue_length; | |
34 queue[queue_length] = st.reply(stanza); | |
35 end | |
36 local ok, err = _send(stanza); | |
37 if ok then | |
38 return _send(st.stanza("r", { xmlns = xmlns_sm })); | |
39 end | |
40 return ok, err; | |
41 end | |
42 _send(st.stanza("enabled", sm_attr)); | |
43 return true; | |
44 end); | |
45 | |
46 module:hook_stanza(xmlns_sm, "r", function (origin, stanza) | |
47 if not origin.smacks then | |
48 module:log("debug", "Received ack request from non-smack-enabled session"); | |
49 return; | |
50 end | |
51 module:log("debug", "Received ack request, acking for %d", origin.handled_stanza_count); | |
52 -- Reply with <a> | |
53 origin.send(st.stanza("a", { xmlns = xmlns_sm, h = tostring(origin.handled_stanza_count) })); | |
54 return true; | |
55 end); | |
56 | |
57 module:hook_stanza(xmlns_sm, "a", function (origin, stanza) | |
58 if not origin.smacks then return; end | |
59 | |
60 -- Remove handled stanzas from outgoing_stanza_queue | |
61 local handled_stanza_count = tonumber(stanza.attr.h)+1; | |
62 for i=1,handled_stanza_count do | |
63 t_remove(origin.outgoing_stanza_queue, 1); | |
64 end | |
65 return true; | |
66 end); | |
67 | |
68 --TODO: Optimise... incoming stanzas should be handled by a per-session | |
69 -- function that has a counter as an upvalue (no table indexing for increments, | |
70 -- and won't slow non-198 sessions). We can also then remove the .handled flag | |
71 -- on stanzas | |
72 | |
73 function catch_all_incoming_stanzas(data) | |
74 local origin, stanza = data.origin, data.stanza; | |
75 if origin.smacks and not stanza.handled then | |
76 stanza.handled = true; | |
77 origin.handled_stanza_count = origin.handled_stanza_count + 1; | |
78 module:log("debug", "Handled %d stanzas", origin.handled_stanza_count); | |
79 end | |
80 end | |
81 module:hook("message/bare", catch_all_incoming_stanzas, 1000); | |
82 module:hook("message/full", catch_all_incoming_stanzas, 1000); | |
83 module:hook("message/host", catch_all_incoming_stanzas, 1000); | |
84 | |
85 module:hook("presence/bare", catch_all_incoming_stanzas, 1000); | |
86 module:hook("presence/full", catch_all_incoming_stanzas, 1000); | |
87 module:hook("presence/host", catch_all_incoming_stanzas, 1000); | |
88 | |
89 module:hook("iq/bare", catch_all_incoming_stanzas, 1000); | |
90 module:hook("iq/full", catch_all_incoming_stanzas, 1000); | |
91 module:hook("iq/host", catch_all_incoming_stanzas, 1000); | |
92 | |
93 function handle_unacked_stanzas(session) | |
94 local queue = session.outgoing_stanza_queue; | |
95 local error_attr = { type = "cancel" }; | |
96 if #queue > 0 then | |
97 for i=1,#queue do | |
98 local reply = queue[i]; | |
99 if reply.attr.to ~= session.full_jid then | |
100 reply.attr.type = "error"; | |
101 reply:tag("error", error_attr) | |
102 :tag("recipient-unavailable", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}); | |
103 core_process_stanza(session, queue[i]); | |
104 end | |
105 queue[i] = nil; | |
106 end | |
107 end | |
108 end | |
109 | |
110 local _destroy_session = sessionmanager.destroy_session; | |
111 function sessionmanager.destroy_session(session, err) | |
112 if session.smacks then | |
113 local queue = session.outgoing_stanza_queue; | |
114 if #queue > 0 then | |
115 module:log("warn", "Destroying session with %d unacked stanzas:", #queue); | |
116 for i=1,#queue do | |
117 module:log("warn", "::%s", tostring(queue[i])); | |
118 end | |
119 handle_unacked_stanzas(session); | |
120 end | |
121 end | |
122 return _destroy_session(session, err); | |
123 end |