Comparison

mod_smacks/mod_smacks.lua @ 1298:659da45a2b4b

mod_smacks: Handle both version 2 and version 3 namespace
author Florian Zeitz <florob@babelmonkeys.de>
date Mon, 03 Feb 2014 22:17:40 +0100
parent 1293:ddbc1eb8d431
child 1324:853a382c9bd6
comparison
equal deleted inserted replaced
1297:3df303543765 1298:659da45a2b4b
7 local tonumber, tostring = tonumber, tostring; 7 local tonumber, tostring = tonumber, tostring;
8 local add_filter = require "util.filters".add_filter; 8 local add_filter = require "util.filters".add_filter;
9 local timer = require "util.timer"; 9 local timer = require "util.timer";
10 local datetime = require "util.datetime"; 10 local datetime = require "util.datetime";
11 11
12 local xmlns_sm = "urn:xmpp:sm:2"; 12 local xmlns_sm2 = "urn:xmpp:sm:2";
13 local xmlns_sm3 = "urn:xmpp:sm:3";
13 local xmlns_errors = "urn:ietf:params:xml:ns:xmpp-stanzas"; 14 local xmlns_errors = "urn:ietf:params:xml:ns:xmpp-stanzas";
14 local xmlns_delay = "urn:xmpp:delay"; 15 local xmlns_delay = "urn:xmpp:delay";
15 16
16 local sm_attr = { xmlns = xmlns_sm }; 17 local sm2_attr = { xmlns = xmlns_sm2 };
18 local sm3_attr = { xmlns = xmlns_sm3 };
17 19
18 local resume_timeout = module:get_option_number("smacks_hibernation_time", 300); 20 local resume_timeout = module:get_option_number("smacks_hibernation_time", 300);
19 local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", false); 21 local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", false);
20 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0); 22 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0);
21 local core_process_stanza = prosody.core_process_stanza; 23 local core_process_stanza = prosody.core_process_stanza;
40 end 42 end
41 43
42 module:hook("stream-features", 44 module:hook("stream-features",
43 function (event) 45 function (event)
44 if can_do_smacks(event.origin, true) then 46 if can_do_smacks(event.origin, true) then
45 event.features:tag("sm", sm_attr):tag("optional"):up():up(); 47 event.features:tag("sm", sm2_attr):tag("optional"):up():up();
48 event.features:tag("sm", sm3_attr):tag("optional"):up():up();
46 end 49 end
47 end); 50 end);
48 51
49 module:hook("s2s-stream-features", 52 module:hook("s2s-stream-features",
50 function (event) 53 function (event)
51 if can_do_smacks(event.origin, true) then 54 if can_do_smacks(event.origin, true) then
52 event.features:tag("sm", sm_attr):tag("optional"):up():up(); 55 event.features:tag("sm", sm2_attr):tag("optional"):up():up();
56 event.features:tag("sm", sm3_attr):tag("optional"):up():up();
53 end 57 end
54 end); 58 end);
55 59
56 module:hook_stanza("http://etherx.jabber.org/streams", "features", 60 module:hook_stanza("http://etherx.jabber.org/streams", "features",
57 function (session, stanza) 61 function (session, stanza)
58 if can_do_smacks(session) and stanza:get_child("sm", xmlns_sm) then 62 if can_do_smacks(session) then
59 session.sends2s(st.stanza("enable", sm_attr)); 63 if stanza:get_child("sm", xmlns_sm3) then
60 end 64 session.sends2s(st.stanza("enable", sm3_attr));
61 end); 65 elseif stanza:get_child("sm", xmlns_sm2) then
62 66 session.sends2s(st.stanza("enable", sm2_attr));
63 local function wrap_session(session, resume) 67 end
68 end
69 end);
70
71 local function wrap_session(session, resume, xmlns_sm)
72 local sm_attr = { xmlns = xmlns_sm };
64 -- Overwrite process_stanza() and send() 73 -- Overwrite process_stanza() and send()
65 local queue; 74 local queue;
66 if not resume then 75 if not resume then
67 queue = {}; 76 queue = {};
68 session.outgoing_stanza_queue = queue; 77 session.outgoing_stanza_queue = queue;
123 end 132 end
124 133
125 return session; 134 return session;
126 end 135 end
127 136
128 module:hook_stanza(xmlns_sm, "enable", function (session, stanza) 137 function handle_enable(session, stanza, xmlns_sm)
129 local ok, err, err_text = can_do_smacks(session); 138 local ok, err, err_text = can_do_smacks(session);
130 if not ok then 139 if not ok then
131 session.log("warn", "Failed to enable smacks: %s", err_text); -- TODO: XEP doesn't say we can send error text, should it? 140 session.log("warn", "Failed to enable smacks: %s", err_text); -- TODO: XEP doesn't say we can send error text, should it?
132 session.send(st.stanza("failed", { xmlns = xmlns_sm }):tag(err, { xmlns = xmlns_errors})); 141 session.send(st.stanza("failed", { xmlns = xmlns_sm }):tag(err, { xmlns = xmlns_errors}));
133 return true; 142 return true;
134 end 143 end
135 144
136 module:log("debug", "Enabling stream management"); 145 module:log("debug", "Enabling stream management");
137 session.smacks = true; 146 session.smacks = true;
138 147
139 wrap_session(session); 148 wrap_session(session, false, xmlns_sm);
140 149
141 local resume_token; 150 local resume_token;
142 local resume = stanza.attr.resume; 151 local resume = stanza.attr.resume;
143 if resume == "true" or resume == "1" then 152 if resume == "true" or resume == "1" then
144 resume_token = uuid_generate(); 153 resume_token = uuid_generate();
145 session_registry[resume_token] = session; 154 session_registry[resume_token] = session;
146 session.resumption_token = resume_token; 155 session.resumption_token = resume_token;
147 end 156 end
148 (session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume })); 157 (session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume }));
149 return true; 158 return true;
150 end, 100); 159 end
151 160 module:hook_stanza(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100);
152 module:hook_stanza(xmlns_sm, "enabled", function (session, stanza) 161 module:hook_stanza(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100);
162
163 function handle_enabled(session, stanza, xmlns_sm)
153 module:log("debug", "Enabling stream management"); 164 module:log("debug", "Enabling stream management");
154 session.smacks = true; 165 session.smacks = true;
155 166
156 wrap_session(session); 167 wrap_session(session, false, xmlns_sm);
157 168
158 -- FIXME Resume? 169 -- FIXME Resume?
159 170
160 return true; 171 return true;
161 end, 100); 172 end
162 173 module:hook_stanza(xmlns_sm2, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm2); end, 100);
163 module:hook_stanza(xmlns_sm, "r", function (origin, stanza) 174 module:hook_stanza(xmlns_sm3, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm3); end, 100);
175
176 function handle_r(origin, stanza, xmlns_sm)
164 if not origin.smacks then 177 if not origin.smacks then
165 module:log("debug", "Received ack request from non-smack-enabled session"); 178 module:log("debug", "Received ack request from non-smack-enabled session");
166 return; 179 return;
167 end 180 end
168 module:log("debug", "Received ack request, acking for %d", origin.handled_stanza_count); 181 module:log("debug", "Received ack request, acking for %d", origin.handled_stanza_count);
169 -- Reply with <a> 182 -- Reply with <a>
170 (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = tostring(origin.handled_stanza_count) })); 183 (origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = tostring(origin.handled_stanza_count) }));
171 return true; 184 return true;
172 end); 185 end
173 186 module:hook_stanza(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end);
174 module:hook_stanza(xmlns_sm, "a", function (origin, stanza) 187 module:hook_stanza(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end);
188
189 function handle_a(origin, stanza)
175 if not origin.smacks then return; end 190 if not origin.smacks then return; end
176 origin.awaiting_ack = nil; 191 origin.awaiting_ack = nil;
177 -- Remove handled stanzas from outgoing_stanza_queue 192 -- Remove handled stanzas from outgoing_stanza_queue
178 --log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or ""); 193 --log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or "");
179 local handled_stanza_count = tonumber(stanza.attr.h)-origin.last_acknowledged_stanza; 194 local handled_stanza_count = tonumber(stanza.attr.h)-origin.last_acknowledged_stanza;
189 for i=1,math_min(handled_stanza_count,#queue) do 204 for i=1,math_min(handled_stanza_count,#queue) do
190 t_remove(origin.outgoing_stanza_queue, 1); 205 t_remove(origin.outgoing_stanza_queue, 1);
191 end 206 end
192 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count; 207 origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count;
193 return true; 208 return true;
194 end); 209 end
210 module:hook_stanza(xmlns_sm2, "a", handle_a);
211 module:hook_stanza(xmlns_sm3, "a", handle_a);
195 212
196 --TODO: Optimise... incoming stanzas should be handled by a per-session 213 --TODO: Optimise... incoming stanzas should be handled by a per-session
197 -- function that has a counter as an upvalue (no table indexing for increments, 214 -- function that has a counter as an upvalue (no table indexing for increments,
198 -- and won't slow non-198 sessions). We can also then remove the .handled flag 215 -- and won't slow non-198 sessions). We can also then remove the .handled flag
199 -- on stanzas 216 -- on stanzas
254 end 271 end
255 272
256 end 273 end
257 end); 274 end);
258 275
259 module:hook_stanza(xmlns_sm, "resume", function (session, stanza) 276 function handle_resume(session, stanza, xmlns_sm)
260 if session.full_jid then 277 if session.full_jid then
261 session.log("warn", "Tried to resume after resource binding"); 278 session.log("warn", "Tried to resume after resource binding");
262 session.send(st.stanza("failed", sm_attr) 279 session.send(st.stanza("failed", { xmlns = xmlns_sm })
263 :tag("unexpected-request", { xmlns = xmlns_errors }) 280 :tag("unexpected-request", { xmlns = xmlns_errors })
264 ); 281 );
265 return true; 282 return true;
266 end 283 end
267 284
268 local id = stanza.attr.previd; 285 local id = stanza.attr.previd;
269 local original_session = session_registry[id]; 286 local original_session = session_registry[id];
270 if not original_session then 287 if not original_session then
271 session.log("debug", "Tried to resume non-existent session with id %s", id); 288 session.log("debug", "Tried to resume non-existent session with id %s", id);
272 session.send(st.stanza("failed", sm_attr) 289 session.send(st.stanza("failed", { xmlns = xmlns_sm })
273 :tag("item-not-found", { xmlns = xmlns_errors }) 290 :tag("item-not-found", { xmlns = xmlns_errors })
274 ); 291 );
275 elseif session.username == original_session.username 292 elseif session.username == original_session.username
276 and session.host == original_session.host then 293 and session.host == original_session.host then
277 session.log("debug", "mod_smacks resuming existing session..."); 294 session.log("debug", "mod_smacks resuming existing session...");
298 if ok then return; end 315 if ok then return; end
299 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); 316 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
300 original_session:close("xml-not-well-formed"); 317 original_session:close("xml-not-well-formed");
301 end 318 end
302 end 319 end
303 wrap_session(original_session, true); 320 wrap_session(original_session, true, xmlns_sm);
304 -- Inform xmppstream of the new session (passed to its callbacks) 321 -- Inform xmppstream of the new session (passed to its callbacks)
305 stream:set_session(original_session); 322 stream:set_session(original_session);
306 -- Similar for connlisteners 323 -- Similar for connlisteners
307 c2s_sessions[session.conn] = original_session; 324 c2s_sessions[session.conn] = original_session;
308 325
321 end 338 end
322 else 339 else
323 module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]", 340 module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]",
324 session.username or "?", session.host or "?", session.type, 341 session.username or "?", session.host or "?", session.type,
325 original_session.username or "?", original_session.host or "?", original_session.type); 342 original_session.username or "?", original_session.host or "?", original_session.type);
326 session.send(st.stanza("failed", sm_attr) 343 session.send(st.stanza("failed", { xmlns = xmlns_sm })
327 :tag("not-authorized", { xmlns = xmlns_errors })); 344 :tag("not-authorized", { xmlns = xmlns_errors }));
328 end 345 end
329 return true; 346 return true;
330 end); 347 end
348 module:hook_stanza(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
349 module:hook_stanza(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);