Comparison

plugins/mod_storage_sql2.lua @ 6745:6728ad041761

Merge 0.10->trunk
author Kim Alvefur <zash@zash.se>
date Thu, 25 Jun 2015 18:57:43 +0200
parent 6729:daa5c83b2879
parent 6740:99b3f29c3c71
child 6749:9dc05d3efdc9
comparison
equal deleted inserted replaced
6729:daa5c83b2879 6745:6728ad041761
1 1
2 local json = require "util.json"; 2 local json = require "util.json";
3 local sql = require "util.sql";
3 local xml_parse = require "util.xml".parse; 4 local xml_parse = require "util.xml".parse;
4 local uuid = require "util.uuid"; 5 local uuid = require "util.uuid";
5 local resolve_relative_path = require "util.paths".resolve_relative_path; 6 local resolve_relative_path = require "util.paths".resolve_relative_path;
6 7
7 local stanza_mt = require"util.stanza".stanza_mt; 8 local stanza_mt = require"util.stanza".stanza_mt;
18 return unpack(row); 19 return unpack(row);
19 end 20 end
20 end, result, nil; 21 end, result, nil;
21 end 22 end
22 23
23 local mod_sql = module:require("sql"); 24 local default_params = { driver = "SQLite3" };
24 local params = module:get_option("sql"); 25
25 26 local engine;
26 local engine; -- TODO create engine 27
27 28 local function serialize(value)
28 local function create_table() 29 local t = type(value);
29 local Table,Column,Index = mod_sql.Table,mod_sql.Column,mod_sql.Index; 30 if t == "string" or t == "boolean" or t == "number" then
31 return t, tostring(value);
32 elseif is_stanza(value) then
33 return "xml", tostring(value);
34 elseif t == "table" then
35 local value,err = json.encode(value);
36 if value then return "json", value; end
37 return nil, err;
38 end
39 return nil, "Unhandled value type: "..t;
40 end
41 local function deserialize(t, value)
42 if t == "string" then return value;
43 elseif t == "boolean" then
44 if value == "true" then return true;
45 elseif value == "false" then return false; end
46 elseif t == "number" then return tonumber(value);
47 elseif t == "json" then
48 return json.decode(value);
49 elseif t == "xml" then
50 return xml_parse(value);
51 end
52 end
53
54 local host = module.host;
55 local user, store;
56
57 local function keyval_store_get()
58 local haveany;
59 local result = {};
60 for row in engine:select("SELECT `key`,`type`,`value` FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=?", host, user or "", store) do
61 haveany = true;
62 local k = row[1];
63 local v = deserialize(row[2], row[3]);
64 if k and v then
65 if k ~= "" then result[k] = v; elseif type(v) == "table" then
66 for a,b in pairs(v) do
67 result[a] = b;
68 end
69 end
70 end
71 end
72 if haveany then
73 return result;
74 end
75 end
76 local function keyval_store_set(data)
77 engine:delete("DELETE FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=?", host, user or "", store);
78
79 if data and next(data) ~= nil then
80 local extradata = {};
81 for key, value in pairs(data) do
82 if type(key) == "string" and key ~= "" then
83 local t, value = serialize(value);
84 assert(t, value);
85 engine:insert("INSERT INTO `prosody` (`host`,`user`,`store`,`key`,`type`,`value`) VALUES (?,?,?,?,?,?)", host, user or "", store, key, t, value);
86 else
87 extradata[key] = value;
88 end
89 end
90 if next(extradata) ~= nil then
91 local t, extradata = serialize(extradata);
92 assert(t, extradata);
93 engine:insert("INSERT INTO `prosody` (`host`,`user`,`store`,`key`,`type`,`value`) VALUES (?,?,?,?,?,?)", host, user or "", store, "", t, extradata);
94 end
95 end
96 return true;
97 end
98
99 --- Key/value store API (default store type)
100
101 local keyval_store = {};
102 keyval_store.__index = keyval_store;
103 function keyval_store:get(username)
104 user, store = username, self.store;
105 local ok, result = engine:transaction(keyval_store_get);
106 if not ok then
107 module:log("error", "Unable to read from database %s store for %s: %s", store, username or "<host>", result);
108 return nil, result;
109 end
110 return result;
111 end
112 function keyval_store:set(username, data)
113 user,store = username,self.store;
114 return engine:transaction(function()
115 return keyval_store_set(data);
116 end);
117 end
118 function keyval_store:users()
119 local ok, result = engine:transaction(function()
120 return engine:select("SELECT DISTINCT `user` FROM `prosody` WHERE `host`=? AND `store`=?", host, self.store);
121 end);
122 if not ok then return ok, result end
123 return iterator(result);
124 end
125
126 --- Archive store API
127
128 local map_store = {};
129 map_store.__index = map_store;
130 function map_store:get(username, key)
131 local ok, result = engine:transaction(function()
132 if type(key) == "string" and key ~= "" then
133 for row in engine:select("SELECT `type`, `value` FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=? AND `key`=?", host, username or "", self.store, key) do
134 return deserialize(row[1], row[2]);
135 end
136 else
137 error("TODO: non-string keys");
138 end
139 end);
140 if not ok then return nil, result; end
141 return result;
142 end
143 function map_store:set(username, key, data)
144 local ok, result = engine:transaction(function()
145 if type(key) == "string" and key ~= "" then
146 engine:delete("DELETE FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=? AND `key`=?",
147 host, username or "", self.store, key);
148 if data ~= nil then
149 local t, value = assert(serialize(data));
150 engine:insert("INSERT INTO `prosody` (`host`,`user`,`store`,`key`,`type`,`value`) VALUES (?,?,?,?,?,?)", host, username or "", self.store, key, t, value);
151 end
152 else
153 error("TODO: non-string keys");
154 end
155 return true;
156 end);
157 if not ok then return nil, result; end
158 return result;
159 end
160
161 local archive_store = {}
162 archive_store.caps = {
163 total = true;
164 };
165 archive_store.__index = archive_store
166 function archive_store:append(username, key, value, when, with)
167 if type(when) ~= "number" then
168 when, with, value = value, when, with;
169 end
170 local user,store = username,self.store;
171 return engine:transaction(function()
172 if key then
173 engine:delete("DELETE FROM `prosodyarchive` WHERE `host`=? AND `user`=? AND `store`=? AND `key`=?", host, user or "", store, key);
174 else
175 key = uuid.generate();
176 end
177 local t, value = serialize(value);
178 engine:insert("INSERT INTO `prosodyarchive` (`host`, `user`, `store`, `when`, `with`, `key`, `type`, `value`) VALUES (?,?,?,?,?,?,?,?)", host, user or "", store, when, with, key, t, value);
179 return key;
180 end);
181 end
182
183 -- Helpers for building the WHERE clause
184 local function archive_where(query, args, where)
185 -- Time range, inclusive
186 if query.start then
187 args[#args+1] = query.start
188 where[#where+1] = "`when` >= ?"
189 end
190
191 if query["end"] then
192 args[#args+1] = query["end"];
193 if query.start then
194 where[#where] = "`when` BETWEEN ? AND ?" -- is this inclusive?
195 else
196 where[#where+1] = "`when` <= ?"
197 end
198 end
199
200 -- Related name
201 if query.with then
202 where[#where+1] = "`with` = ?";
203 args[#args+1] = query.with
204 end
205
206 -- Unique id
207 if query.key then
208 where[#where+1] = "`key` = ?";
209 args[#args+1] = query.key
210 end
211 end
212 local function archive_where_id_range(query, args, where)
213 local args_len = #args
214 -- Before or after specific item, exclusive
215 if query.after then -- keys better be unique!
216 where[#where+1] = "`sort_id` > (SELECT `sort_id` FROM `prosodyarchive` WHERE `key` = ? AND `host` = ? AND `user` = ? AND `store` = ? LIMIT 1)"
217 args[args_len+1], args[args_len+2], args[args_len+3], args[args_len+4] = query.after, args[1], args[2], args[3];
218 args_len = args_len + 4
219 end
220 if query.before then
221 where[#where+1] = "`sort_id` < (SELECT `sort_id` FROM `prosodyarchive` WHERE `key` = ? AND `host` = ? AND `user` = ? AND `store` = ? LIMIT 1)"
222 args[args_len+1], args[args_len+2], args[args_len+3], args[args_len+4] = query.before, args[1], args[2], args[3];
223 end
224 end
225
226 function archive_store:find(username, query)
227 query = query or {};
228 local user,store = username,self.store;
229 local total;
230 local ok, result = engine:transaction(function()
231 local sql_query = "SELECT `key`, `type`, `value`, `when`, `with` FROM `prosodyarchive` WHERE %s ORDER BY `sort_id` %s%s;";
232 local args = { host, user or "", store, };
233 local where = { "`host` = ?", "`user` = ?", "`store` = ?", };
234
235 archive_where(query, args, where);
236
237 -- Total matching
238 if query.total then
239 local stats = engine:select("SELECT COUNT(*) FROM `prosodyarchive` WHERE " .. t_concat(where, " AND "), unpack(args));
240 if stats then
241 local _total = stats()
242 total = _total and _total[1];
243 end
244 if query.limit == 0 then -- Skip the real query
245 return noop, total;
246 end
247 end
248
249 archive_where_id_range(query, args, where);
250
251 if query.limit then
252 args[#args+1] = query.limit;
253 end
254
255 sql_query = sql_query:format(t_concat(where, " AND "), query.reverse and "DESC" or "ASC", query.limit and " LIMIT ?" or "");
256 module:log("debug", sql_query);
257 return engine:select(sql_query, unpack(args));
258 end);
259 if not ok then return ok, result end
260 return function()
261 local row = result();
262 if row ~= nil then
263 return row[1], deserialize(row[2], row[3]), row[4], row[5];
264 end
265 end, total;
266 end
267
268 function archive_store:delete(username, query)
269 query = query or {};
270 local user,store = username,self.store;
271 return engine:transaction(function()
272 local sql_query = "DELETE FROM `prosodyarchive` WHERE %s;";
273 local args = { host, user or "", store, };
274 local where = { "`host` = ?", "`user` = ?", "`store` = ?", };
275 if user == true then
276 table.remove(args, 2);
277 table.remove(where, 2);
278 end
279 archive_where(query, args, where);
280 archive_where_id_range(query, args, where);
281 sql_query = sql_query:format(t_concat(where, " AND "));
282 module:log("debug", sql_query);
283 return engine:delete(sql_query, unpack(args));
284 end);
285 end
286
287 local stores = {
288 keyval = keyval_store;
289 map = map_store;
290 archive = archive_store;
291 };
292
293 --- Implement storage driver API
294
295 -- FIXME: Some of these operations need to operate on the archive store(s) too
296
297 local driver = {};
298
299 function driver:open(store, typ)
300 local store_mt = stores[typ or "keyval"];
301 if store_mt then
302 return setmetatable({ store = store }, store_mt);
303 end
304 return nil, "unsupported-store";
305 end
306
307 function driver:stores(username)
308 local query = "SELECT DISTINCT `store` FROM `prosody` WHERE `host`=? AND `user`" ..
309 (username == true and "!=?" or "=?");
310 if username == true or not username then
311 username = "";
312 end
313 local ok, result = engine:transaction(function()
314 return engine:select(query, host, username);
315 end);
316 if not ok then return ok, result end
317 return iterator(result);
318 end
319
320 function driver:purge(username)
321 return engine:transaction(function()
322 local stmt,err = engine:delete("DELETE FROM `prosody` WHERE `host`=? AND `user`=?", host, username);
323 return true, err;
324 end);
325 end
326
327 --- Initialization
328
329
330 local function create_table(name)
331 local Table, Column, Index = sql.Table, sql.Column, sql.Index;
30 332
31 local ProsodyTable = Table { 333 local ProsodyTable = Table {
32 name="prosody"; 334 name= name or "prosody";
33 Column { name="host", type="TEXT", nullable=false }; 335 Column { name="host", type="TEXT", nullable=false };
34 Column { name="user", type="TEXT", nullable=false }; 336 Column { name="user", type="TEXT", nullable=false };
35 Column { name="store", type="TEXT", nullable=false }; 337 Column { name="store", type="TEXT", nullable=false };
36 Column { name="key", type="TEXT", nullable=false }; 338 Column { name="key", type="TEXT", nullable=false };
37 Column { name="type", type="TEXT", nullable=false }; 339 Column { name="type", type="TEXT", nullable=false };
58 engine:transaction(function() 360 engine:transaction(function()
59 ProsodyArchiveTable:create(engine); 361 ProsodyArchiveTable:create(engine);
60 end); 362 end);
61 end 363 end
62 364
63 local function upgrade_table() 365 local function upgrade_table(params, apply_changes)
366 local changes = false;
64 if params.driver == "MySQL" then 367 if params.driver == "MySQL" then
65 local success,err = engine:transaction(function() 368 local success,err = engine:transaction(function()
66 local result = engine:execute("SHOW COLUMNS FROM prosody WHERE Field='value' and Type='text'"); 369 local result = engine:execute("SHOW COLUMNS FROM prosody WHERE Field='value' and Type='text'");
67 if result:rowcount() > 0 then 370 if result:rowcount() > 0 then
68 module:log("info", "Upgrading database schema..."); 371 changes = true;
69 engine:execute("ALTER TABLE prosody MODIFY COLUMN `value` MEDIUMTEXT"); 372 if apply_changes then
70 module:log("info", "Database table automatically upgraded"); 373 module:log("info", "Upgrading database schema...");
374 engine:execute("ALTER TABLE prosody MODIFY COLUMN `value` MEDIUMTEXT");
375 module:log("info", "Database table automatically upgraded");
376 end
71 end 377 end
72 return true; 378 return true;
73 end); 379 end);
74 if not success then 380 if not success then
75 module:log("error", "Failed to check/upgrade database schema (%s), please see " 381 module:log("error", "Failed to check/upgrade database schema (%s), please see "
76 .."http://prosody.im/doc/mysql for help", 382 .."http://prosody.im/doc/mysql for help",
77 err or "unknown error"); 383 err or "unknown error");
78 return false; 384 return false;
79 end 385 end
80 -- COMPAT w/pre-0.9: Upgrade tables to UTF-8 if not already 386
387 -- COMPAT w/pre-0.10: Upgrade table to UTF-8 if not already
81 local check_encoding_query = "SELECT `COLUMN_NAME`,`COLUMN_TYPE` FROM `information_schema`.`columns` WHERE `TABLE_NAME`='prosody' AND ( `CHARACTER_SET_NAME`!='utf8' OR `COLLATION_NAME`!='utf8_bin' );"; 388 local check_encoding_query = "SELECT `COLUMN_NAME`,`COLUMN_TYPE` FROM `information_schema`.`columns` WHERE `TABLE_NAME`='prosody' AND ( `CHARACTER_SET_NAME`!='utf8' OR `COLLATION_NAME`!='utf8_bin' );";
82 success,err = engine:transaction(function() 389 success,err = engine:transaction(function()
83 local result = engine:execute(check_encoding_query); 390 local result = engine:execute(check_encoding_query);
84 local n_bad_columns = result:rowcount(); 391 local n_bad_columns = result:rowcount();
85 if n_bad_columns > 0 then 392 if n_bad_columns > 0 then
86 module:log("warn", "Found %d columns in prosody table requiring encoding change, updating now...", n_bad_columns); 393 changes = true;
87 local fix_column_query1 = "ALTER TABLE `prosody` CHANGE `%s` `%s` BLOB;"; 394 if apply_changes then
88 local fix_column_query2 = "ALTER TABLE `prosody` CHANGE `%s` `%s` %s CHARACTER SET 'utf8' COLLATE 'utf8_bin';"; 395 module:log("warn", "Found %d columns in prosody table requiring encoding change, updating now...", n_bad_columns);
89 for row in result:rows() do 396 local fix_column_query1 = "ALTER TABLE `prosody` CHANGE `%s` `%s` BLOB;";
90 local column_name, column_type = unpack(row); 397 local fix_column_query2 = "ALTER TABLE `prosody` CHANGE `%s` `%s` %s CHARACTER SET 'utf8' COLLATE 'utf8_bin';";
91 engine:execute(fix_column_query1:format(column_name, column_name)); 398 for row in result:rows() do
92 engine:execute(fix_column_query2:format(column_name, column_name, column_type)); 399 local column_name, column_type = unpack(row);
400 engine:execute(fix_column_query1:format(column_name, column_name));
401 engine:execute(fix_column_query2:format(column_name, column_name, column_type));
402 end
403 module:log("info", "Database encoding upgrade complete!");
93 end 404 end
94 module:log("info", "Database encoding upgrade complete!");
95 end 405 end
96 end); 406 end);
97 success,err = engine:transaction(function() return engine:execute(check_encoding_query); end); 407 success,err = engine:transaction(function() return engine:execute(check_encoding_query); end);
98 if not success then 408 if not success then
99 module:log("error", "Failed to check/upgrade database encoding: %s", err or "unknown error"); 409 module:log("error", "Failed to check/upgrade database encoding: %s", err or "unknown error");
100 end 410 return false;
101 end 411 end
102 end 412 end
103 413 end
104 do -- process options to get a db connection 414
105 params = params or { driver = "SQLite3" }; 415 local function normalize_params(params)
106 416 assert(params.driver and params.database, "Configuration error: Both the SQL driver and the database need to be specified");
107 if params.driver == "SQLite3" then 417 if params.driver == "SQLite3" then
108 params.database = resolve_relative_path(prosody.paths.data or ".", params.database or "prosody.sqlite"); 418 params.database = resolve_relative_path(prosody.paths.data or ".", params.database or "prosody.sqlite");
109 end 419 end
110 420 return params;
111 assert(params.driver and params.database, "Both the SQL driver and the database need to be specified"); 421 end
112 422
113 --local dburi = db2uri(params); 423 function module.load()
114 engine = mod_sql:create_engine(params); 424 if prosody.prosodyctl then return; end
115 425 local params = normalize_params(module:get_option("sql", default_params));
116 if module:get_option("sql_manage_tables", true) then 426 engine = sql:create_engine(params, function (engine)
117 -- Automatically create table, ignore failure (table probably already exists) 427 if module:get_option("sql_manage_tables", true) then
118 create_table(); 428 -- Automatically create table, ignore failure (table probably already exists)
119 -- Encoding mess 429 -- FIXME: we should check in information_schema, etc.
120 upgrade_table(); 430 create_table();
121 end 431 -- Check whether the table needs upgrading
122 end 432 if not upgrade_table(params, true) then
123 433 module:log("error", "Old database format detected, and upgrade failed");
124 local function serialize(value) 434 return false, "database upgrade needed";
125 local t = type(value); 435 end
126 if t == "string" or t == "boolean" or t == "number" then 436 end
127 return t, tostring(value); 437 end);
128 elseif is_stanza(value) then 438
129 return "xml", tostring(value); 439 module:provides("storage", driver);
130 elseif t == "table" then 440 end
131 local value,err = json.encode(value);
132 if value then return "json", value; end
133 return nil, err;
134 end
135 return nil, "Unhandled value type: "..t;
136 end
137 local function deserialize(t, value)
138 if t == "string" then return value;
139 elseif t == "boolean" then
140 if value == "true" then return true;
141 elseif value == "false" then return false; end
142 elseif t == "number" then return tonumber(value);
143 elseif t == "json" then
144 return json.decode(value);
145 elseif t == "xml" then
146 return xml_parse(value);
147 end
148 end
149
150 local host = module.host;
151 local user, store;
152
153 local function keyval_store_get()
154 local haveany;
155 local result = {};
156 for row in engine:select("SELECT `key`,`type`,`value` FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=?", host, user or "", store) do
157 haveany = true;
158 local k = row[1];
159 local v = deserialize(row[2], row[3]);
160 if k and v then
161 if k ~= "" then result[k] = v; elseif type(v) == "table" then
162 for a,b in pairs(v) do
163 result[a] = b;
164 end
165 end
166 end
167 end
168 if haveany then
169 return result;
170 end
171 end
172 local function keyval_store_set(data)
173 engine:delete("DELETE FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=?", host, user or "", store);
174
175 if data and next(data) ~= nil then
176 local extradata = {};
177 for key, value in pairs(data) do
178 if type(key) == "string" and key ~= "" then
179 local t, value = serialize(value);
180 assert(t, value);
181 engine:insert("INSERT INTO `prosody` (`host`,`user`,`store`,`key`,`type`,`value`) VALUES (?,?,?,?,?,?)", host, user or "", store, key, t, value);
182 else
183 extradata[key] = value;
184 end
185 end
186 if next(extradata) ~= nil then
187 local t, extradata = serialize(extradata);
188 assert(t, extradata);
189 engine:insert("INSERT INTO `prosody` (`host`,`user`,`store`,`key`,`type`,`value`) VALUES (?,?,?,?,?,?)", host, user or "", store, "", t, extradata);
190 end
191 end
192 return true;
193 end
194
195 local keyval_store = {};
196 keyval_store.__index = keyval_store;
197 function keyval_store:get(username)
198 user,store = username,self.store;
199 local ok, result = engine:transaction(keyval_store_get);
200 if not ok then return ok, result; end
201 return result;
202 end
203 function keyval_store:set(username, data)
204 user,store = username,self.store;
205 return engine:transaction(function()
206 return keyval_store_set(data);
207 end);
208 end
209 function keyval_store:users()
210 local ok, result = engine:transaction(function()
211 return engine:select("SELECT DISTINCT `user` FROM `prosody` WHERE `host`=? AND `store`=?", host, self.store);
212 end);
213 if not ok then return ok, result end
214 return iterator(result);
215 end
216
217 local map_store = {};
218 map_store.__index = map_store;
219 function map_store:get(username, key)
220 local ok, result = engine:transaction(function()
221 if type(key) == "string" and key ~= "" then
222 for row in engine:select("SELECT `type`, `value` FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=? AND `key`=?", host, username or "", self.store, key) do
223 return deserialize(row[1], row[2]);
224 end
225 else
226 error("TODO: non-string keys");
227 end
228 end);
229 if not ok then return nil, result; end
230 return result;
231 end
232 function map_store:set(username, key, data)
233 local ok, result = engine:transaction(function()
234 if type(key) == "string" and key ~= "" then
235 engine:delete("DELETE FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=? AND `key`=?",
236 host, username or "", self.store, key);
237 if data ~= nil then
238 local t, value = assert(serialize(data));
239 engine:insert("INSERT INTO `prosody` (`host`,`user`,`store`,`key`,`type`,`value`) VALUES (?,?,?,?,?,?)", host, username or "", self.store, key, t, value);
240 end
241 else
242 error("TODO: non-string keys");
243 end
244 return true;
245 end);
246 if not ok then return nil, result; end
247 return result;
248 end
249
250 local archive_store = {}
251 archive_store.caps = {
252 total = true;
253 };
254 archive_store.__index = archive_store
255 function archive_store:append(username, key, value, when, with)
256 if type(when) ~= "number" then
257 when, with, value = value, when, with;
258 end
259 local user,store = username,self.store;
260 return engine:transaction(function()
261 if key then
262 engine:delete("DELETE FROM `prosodyarchive` WHERE `host`=? AND `user`=? AND `store`=? AND `key`=?", host, user or "", store, key);
263 else
264 key = uuid.generate();
265 end
266 local t, value = serialize(value);
267 engine:insert("INSERT INTO `prosodyarchive` (`host`, `user`, `store`, `when`, `with`, `key`, `type`, `value`) VALUES (?,?,?,?,?,?,?,?)", host, user or "", store, when, with, key, t, value);
268 return key;
269 end);
270 end
271
272 -- Helpers for building the WHERE clause
273 local function archive_where(query, args, where)
274 -- Time range, inclusive
275 if query.start then
276 args[#args+1] = query.start
277 where[#where+1] = "`when` >= ?"
278 end
279
280 if query["end"] then
281 args[#args+1] = query["end"];
282 if query.start then
283 where[#where] = "`when` BETWEEN ? AND ?" -- is this inclusive?
284 else
285 where[#where+1] = "`when` <= ?"
286 end
287 end
288
289 -- Related name
290 if query.with then
291 where[#where+1] = "`with` = ?";
292 args[#args+1] = query.with
293 end
294
295 -- Unique id
296 if query.key then
297 where[#where+1] = "`key` = ?";
298 args[#args+1] = query.key
299 end
300 end
301 local function archive_where_id_range(query, args, where)
302 local args_len = #args
303 -- Before or after specific item, exclusive
304 if query.after then -- keys better be unique!
305 where[#where+1] = "`sort_id` > (SELECT `sort_id` FROM `prosodyarchive` WHERE `key` = ? AND `host` = ? AND `user` = ? AND `store` = ? LIMIT 1)"
306 args[args_len+1], args[args_len+2], args[args_len+3], args[args_len+4] = query.after, args[1], args[2], args[3];
307 args_len = args_len + 4
308 end
309 if query.before then
310 where[#where+1] = "`sort_id` < (SELECT `sort_id` FROM `prosodyarchive` WHERE `key` = ? AND `host` = ? AND `user` = ? AND `store` = ? LIMIT 1)"
311 args[args_len+1], args[args_len+2], args[args_len+3], args[args_len+4] = query.before, args[1], args[2], args[3];
312 end
313 end
314
315 function archive_store:find(username, query)
316 query = query or {};
317 local user,store = username,self.store;
318 local total;
319 local ok, result = engine:transaction(function()
320 local sql_query = "SELECT `key`, `type`, `value`, `when`, `with` FROM `prosodyarchive` WHERE %s ORDER BY `sort_id` %s%s;";
321 local args = { host, user or "", store, };
322 local where = { "`host` = ?", "`user` = ?", "`store` = ?", };
323
324 archive_where(query, args, where);
325
326 -- Total matching
327 if query.total then
328 local stats = engine:select("SELECT COUNT(*) FROM `prosodyarchive` WHERE " .. t_concat(where, " AND "), unpack(args));
329 if stats then
330 local _total = stats()
331 total = _total and _total[1];
332 end
333 if query.limit == 0 then -- Skip the real query
334 return noop, total;
335 end
336 end
337
338 archive_where_id_range(query, args, where);
339
340 if query.limit then
341 args[#args+1] = query.limit;
342 end
343
344 sql_query = sql_query:format(t_concat(where, " AND "), query.reverse and "DESC" or "ASC", query.limit and " LIMIT ?" or "");
345 module:log("debug", sql_query);
346 return engine:select(sql_query, unpack(args));
347 end);
348 if not ok then return ok, result end
349 return function()
350 local row = result();
351 if row ~= nil then
352 return row[1], deserialize(row[2], row[3]), row[4], row[5];
353 end
354 end, total;
355 end
356
357 function archive_store:delete(username, query)
358 query = query or {};
359 local user,store = username,self.store;
360 return engine:transaction(function()
361 local sql_query = "DELETE FROM `prosodyarchive` WHERE %s;";
362 local args = { host, user or "", store, };
363 local where = { "`host` = ?", "`user` = ?", "`store` = ?", };
364 if user == true then
365 table.remove(args, 2);
366 table.remove(where, 2);
367 end
368 archive_where(query, args, where);
369 archive_where_id_range(query, args, where);
370 sql_query = sql_query:format(t_concat(where, " AND "));
371 module:log("debug", sql_query);
372 return engine:delete(sql_query, unpack(args));
373 end);
374 end
375
376 local stores = {
377 keyval = keyval_store;
378 map = map_store;
379 archive = archive_store;
380 };
381
382 local driver = {};
383
384 function driver:open(store, typ)
385 local store_mt = stores[typ or "keyval"];
386 if store_mt then
387 return setmetatable({ store = store }, store_mt);
388 end
389 return nil, "unsupported-store";
390 end
391
392 function driver:stores(username)
393 local sql = "SELECT DISTINCT `store` FROM `prosody` WHERE `host`=? AND `user`" ..
394 (username == true and "!=?" or "=?");
395 if username == true or not username then
396 username = "";
397 end
398 local ok, result = engine:transaction(function()
399 return engine:select(sql, host, username);
400 end);
401 if not ok then return ok, result end
402 return iterator(result);
403 end
404
405 function driver:purge(username)
406 return engine:transaction(function()
407 local stmt,err = engine:delete("DELETE FROM `prosody` WHERE `host`=? AND `user`=?", host, username);
408 return true,err;
409 end);
410 end
411
412 module:provides("storage", driver);
413
414