Comparison

mod_mam/mod_mam.lua @ 701:cc5805f83583

mod_mam: Implement support for Result Set Management in queries.
author Kim Alvefur <zash@zash.se>
date Fri, 08 Jun 2012 03:13:31 +0200
parent 675:da33325453fb
child 702:d94ee0848b27
comparison
equal deleted inserted replaced
700:0c130c45b7c1 701:cc5805f83583
6 local xmlns_mam = "urn:xmpp:mam:tmp"; 6 local xmlns_mam = "urn:xmpp:mam:tmp";
7 local xmlns_delay = "urn:xmpp:delay"; 7 local xmlns_delay = "urn:xmpp:delay";
8 local xmlns_forward = "urn:xmpp:forward:0"; 8 local xmlns_forward = "urn:xmpp:forward:0";
9 9
10 local st = require "util.stanza"; 10 local st = require "util.stanza";
11 local rsm = module:require "rsm";
11 local jid_bare = require "util.jid".bare; 12 local jid_bare = require "util.jid".bare;
12 local jid_split = require "util.jid".split; 13 local jid_split = require "util.jid".split;
13 local host = module.host; 14 local host = module.host;
14 15
15 local dm_load = require "util.datamanager".load; 16 local dm_load = require "util.datamanager".load;
60 if stanza.attr.type == "get" then 61 if stanza.attr.type == "get" then
61 local prefs = get_prefs(user); 62 local prefs = get_prefs(user);
62 local default = prefs[false]; 63 local default = prefs[false];
63 default = default ~= nil and default_attrs[default] or global_default_policy; 64 default = default ~= nil and default_attrs[default] or global_default_policy;
64 local reply = st.reply(stanza):tag("prefs", { xmlns = xmlns_mam, default = default }) 65 local reply = st.reply(stanza):tag("prefs", { xmlns = xmlns_mam, default = default })
65 --module:log("debug", "get_prefs(%q) => %s", user, require"util.serialization".serialize(prefs));
66 local always = st.stanza("always"); 66 local always = st.stanza("always");
67 local never = st.stanza("never"); 67 local never = st.stanza("never");
68 for k,v in pairs(prefs) do 68 for k,v in pairs(prefs) do
69 if k then 69 if k then
70 (v and always or never):tag("jid"):text(k):up(); 70 (v and always or never):tag("jid"):text(k):up();
95 local jid = rule:get_text(); 95 local jid = rule:get_text();
96 prefs[jid] = false; 96 prefs[jid] = false;
97 end 97 end
98 end 98 end
99 99
100 --module:log("debug", "set_prefs(%q, %s)", user, require"util.serialization".serialize(prefs));
101 local ok, err = set_prefs(user, prefs); 100 local ok, err = set_prefs(user, prefs);
102 if not ok then 101 if not ok then
103 origin.send(st.error_reply(stanza, "cancel", "internal-server-error", "Error storing preferences: "..tostring(err))); 102 origin.send(st.error_reply(stanza, "cancel", "internal-server-error", "Error storing preferences: "..tostring(err)));
104 else 103 else
105 origin.send(st.reply(stanza)); 104 origin.send(st.reply(stanza));
117 116
118 -- Search query parameters 117 -- Search query parameters
119 local qwith = query:get_child_text("with"); 118 local qwith = query:get_child_text("with");
120 local qstart = query:get_child_text("start"); 119 local qstart = query:get_child_text("start");
121 local qend = query:get_child_text("end"); 120 local qend = query:get_child_text("end");
121 local qset = rsm.get(query);
122 module:log("debug", "Archive query, id %s with %s from %s until %s)", 122 module:log("debug", "Archive query, id %s with %s from %s until %s)",
123 tostring(qid), qwith or "anyone", qstart or "the dawn of time", qend or "now"); 123 tostring(qid), qwith or "anyone", qstart or "the dawn of time", qend or "now");
124 124
125 qstart, qend = (qstart and timestamp_parse(qstart)), (qend and timestamp_parse(qend)) 125 qstart, qend = (qstart and timestamp_parse(qstart)), (qend and timestamp_parse(qend))
126 126
134 origin.send(st.error_reply(stanza, "cancel", "internal-server-error", "Error loading archive: "..tostring(err))); 134 origin.send(st.error_reply(stanza, "cancel", "internal-server-error", "Error loading archive: "..tostring(err)));
135 end 135 end
136 return true 136 return true
137 end 137 end
138 138
139 -- RSM stuff
140 local qset_matches = not (qset and qset.after);
141 local first, last, index;
142 local n = 0;
143 local start = qset and qset.index or 1;
144
139 module:log("debug", "Loaded %d items, about to filter", #data); 145 module:log("debug", "Loaded %d items, about to filter", #data);
140 for i=1,#data do 146 for i=start,#data do
141 local item = data[i]; 147 local item = data[i];
142 local when, with, with_bare = item.when, item.with, item.with_bare; 148 local when, with, with_bare = item.when, item.with, item.with_bare;
143 local ts = item.timestamp; 149 local ts = item.timestamp;
144 local id = item.id; 150 local id = item.id;
151 --module:log("debug", "id is %s", id);
152
153 -- RSM pre-send-checking
154 if qset then
155 if qset.before == id then
156 module:log("debug", "End of matching range found");
157 qset_matches = false;
158 break;
159 end
160 end
161
145 --module:log("debug", "message with %s at %s", with, when or "???"); 162 --module:log("debug", "message with %s at %s", with, when or "???");
146 -- Apply query filter 163 -- Apply query filter
147 if (not qwith or ((qwith == with) or (qwith == with_bare))) 164 if (not qwith or ((qwith == with) or (qwith == with_bare)))
148 and (not qstart or when >= qstart) 165 and (not qstart or when >= qstart)
149 and (not qend or when <= qend) then 166 and (not qend or when <= qend)
150 -- Optimizable? Do this when archiving? 167 and (not qset or qset_matches) then
151 --module:log("debug", "sending");
152 local fwd_st = st.message{ to = origin.full_jid } 168 local fwd_st = st.message{ to = origin.full_jid }
153 :tag("result", { xmlns = xmlns_mam, queryid = qid, id = id }):up() 169 :tag("result", { xmlns = xmlns_mam, queryid = qid, id = id }):up()
154 :tag("forwarded", { xmlns = xmlns_forward }) 170 :tag("forwarded", { xmlns = xmlns_forward })
155 :tag("delay", { xmlns = xmlns_delay, stamp = ts or timestamp(when) }):up(); 171 :tag("delay", { xmlns = xmlns_delay, stamp = ts or timestamp(when) }):up();
156 local orig_stanza = st.deserialize(item.stanza); 172 local orig_stanza = st.deserialize(item.stanza);
157 orig_stanza.attr.xmlns = "jabber:client"; 173 orig_stanza.attr.xmlns = "jabber:client";
158 fwd_st:add_child(orig_stanza); 174 fwd_st:add_child(orig_stanza);
159 origin.send(fwd_st); 175 origin.send(fwd_st);
160 elseif qend and when > qend then 176 if not first then
177 index = i;
178 first = id;
179 end
180 last = id;
181 n = n + 1;
182 elseif (qend and when > qend) then
183 module:log("debug", "We have passed into messages more recent than requested");
161 break -- We have passed into messages more recent than requested 184 break -- We have passed into messages more recent than requested
185 end
186
187 -- RSM post-send-checking
188 if qset then
189 if qset.after == id then
190 module:log("debug", "Start of matching range found");
191 qset_matches = true;
192 end
193 if qset.max and n >= qset.max then
194 module:log("debug", "Max number of items matched");
195 break
196 end
162 end 197 end
163 end 198 end
164 -- That's all folks! 199 -- That's all folks!
165 module:log("debug", "Archive query %s completed", tostring(qid)); 200 module:log("debug", "Archive query %s completed", tostring(qid));
166 origin.send(st.reply(stanza)); 201 origin.send(st.reply(stanza):add_child(rsm.generate{first = { index = index; first }, last = last}));
167 return true 202 return true
168 end 203 end
169 end); 204 end);
170 205
171 local function has_in_roster(user, who) 206 local function has_in_roster(user, who)
219 254
220 local store_user, store_host = jid_split(c2s and orig_from or orig_to); 255 local store_user, store_host = jid_split(c2s and orig_from or orig_to);
221 local target_jid = c2s and orig_to or orig_from; 256 local target_jid = c2s and orig_to or orig_from;
222 local target_bare = jid_bare(target_jid); 257 local target_bare = jid_bare(target_jid);
223 258
224 assert(store_host == host, "This should not happen.");
225
226 if shall_store(store_user, target_bare) then 259 if shall_store(store_user, target_bare) then
227 module:log("debug", "Archiving stanza: %s", stanza:top_tag()); 260 module:log("debug", "Archiving stanza: %s", stanza:top_tag());
228 261
229 local id = uuid(); 262 local id = uuid();
230 local when = time_now(); 263 local when = time_now();
231 -- And stash it 264 -- And stash it
232 local ok, err = dm_list_append(store_user, store_host, archive_store, { 265 local ok, err = dm_list_append(store_user, store_host, archive_store, {
233 -- WARNING This format may change. 266 -- WARNING This format may change.
234 id = id, 267 id = id,
235 when = when, -- This might be an UNIX timestamp. Probably. 268 when = when,
236 timestamp = timestamp(when), -- Textual timestamp. But I'll assume that comparing numbers is faster and less annoying in case of timezones.
237 with = target_jid, 269 with = target_jid,
238 with_bare = target_bare, -- Optimization, to avoid loads of jid_bare() calls when filtering. 270 with_bare = target_bare, -- Optimization, to avoid loads of jid_bare() calls when filtering.
239 stanza = st.preserialize(stanza) 271 stanza = st.preserialize(stanza)
240 }); 272 });
241 --[[ This was dropped from the spec 273 --[[ This was dropped from the spec