Software /
code /
prosody
Comparison
plugins/mod_storage_sql2.lua @ 6745:6728ad041761
Merge 0.10->trunk
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Thu, 25 Jun 2015 18:57:43 +0200 |
parent | 6729:daa5c83b2879 |
parent | 6740:99b3f29c3c71 |
child | 6749:9dc05d3efdc9 |
comparison
equal
deleted
inserted
replaced
6729:daa5c83b2879 | 6745:6728ad041761 |
---|---|
1 | 1 |
2 local json = require "util.json"; | 2 local json = require "util.json"; |
3 local sql = require "util.sql"; | |
3 local xml_parse = require "util.xml".parse; | 4 local xml_parse = require "util.xml".parse; |
4 local uuid = require "util.uuid"; | 5 local uuid = require "util.uuid"; |
5 local resolve_relative_path = require "util.paths".resolve_relative_path; | 6 local resolve_relative_path = require "util.paths".resolve_relative_path; |
6 | 7 |
7 local stanza_mt = require"util.stanza".stanza_mt; | 8 local stanza_mt = require"util.stanza".stanza_mt; |
18 return unpack(row); | 19 return unpack(row); |
19 end | 20 end |
20 end, result, nil; | 21 end, result, nil; |
21 end | 22 end |
22 | 23 |
23 local mod_sql = module:require("sql"); | 24 local default_params = { driver = "SQLite3" }; |
24 local params = module:get_option("sql"); | 25 |
25 | 26 local engine; |
26 local engine; -- TODO create engine | 27 |
27 | 28 local function serialize(value) |
28 local function create_table() | 29 local t = type(value); |
29 local Table,Column,Index = mod_sql.Table,mod_sql.Column,mod_sql.Index; | 30 if t == "string" or t == "boolean" or t == "number" then |
31 return t, tostring(value); | |
32 elseif is_stanza(value) then | |
33 return "xml", tostring(value); | |
34 elseif t == "table" then | |
35 local value,err = json.encode(value); | |
36 if value then return "json", value; end | |
37 return nil, err; | |
38 end | |
39 return nil, "Unhandled value type: "..t; | |
40 end | |
41 local function deserialize(t, value) | |
42 if t == "string" then return value; | |
43 elseif t == "boolean" then | |
44 if value == "true" then return true; | |
45 elseif value == "false" then return false; end | |
46 elseif t == "number" then return tonumber(value); | |
47 elseif t == "json" then | |
48 return json.decode(value); | |
49 elseif t == "xml" then | |
50 return xml_parse(value); | |
51 end | |
52 end | |
53 | |
54 local host = module.host; | |
55 local user, store; | |
56 | |
57 local function keyval_store_get() | |
58 local haveany; | |
59 local result = {}; | |
60 for row in engine:select("SELECT `key`,`type`,`value` FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=?", host, user or "", store) do | |
61 haveany = true; | |
62 local k = row[1]; | |
63 local v = deserialize(row[2], row[3]); | |
64 if k and v then | |
65 if k ~= "" then result[k] = v; elseif type(v) == "table" then | |
66 for a,b in pairs(v) do | |
67 result[a] = b; | |
68 end | |
69 end | |
70 end | |
71 end | |
72 if haveany then | |
73 return result; | |
74 end | |
75 end | |
76 local function keyval_store_set(data) | |
77 engine:delete("DELETE FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=?", host, user or "", store); | |
78 | |
79 if data and next(data) ~= nil then | |
80 local extradata = {}; | |
81 for key, value in pairs(data) do | |
82 if type(key) == "string" and key ~= "" then | |
83 local t, value = serialize(value); | |
84 assert(t, value); | |
85 engine:insert("INSERT INTO `prosody` (`host`,`user`,`store`,`key`,`type`,`value`) VALUES (?,?,?,?,?,?)", host, user or "", store, key, t, value); | |
86 else | |
87 extradata[key] = value; | |
88 end | |
89 end | |
90 if next(extradata) ~= nil then | |
91 local t, extradata = serialize(extradata); | |
92 assert(t, extradata); | |
93 engine:insert("INSERT INTO `prosody` (`host`,`user`,`store`,`key`,`type`,`value`) VALUES (?,?,?,?,?,?)", host, user or "", store, "", t, extradata); | |
94 end | |
95 end | |
96 return true; | |
97 end | |
98 | |
99 --- Key/value store API (default store type) | |
100 | |
101 local keyval_store = {}; | |
102 keyval_store.__index = keyval_store; | |
103 function keyval_store:get(username) | |
104 user, store = username, self.store; | |
105 local ok, result = engine:transaction(keyval_store_get); | |
106 if not ok then | |
107 module:log("error", "Unable to read from database %s store for %s: %s", store, username or "<host>", result); | |
108 return nil, result; | |
109 end | |
110 return result; | |
111 end | |
112 function keyval_store:set(username, data) | |
113 user,store = username,self.store; | |
114 return engine:transaction(function() | |
115 return keyval_store_set(data); | |
116 end); | |
117 end | |
118 function keyval_store:users() | |
119 local ok, result = engine:transaction(function() | |
120 return engine:select("SELECT DISTINCT `user` FROM `prosody` WHERE `host`=? AND `store`=?", host, self.store); | |
121 end); | |
122 if not ok then return ok, result end | |
123 return iterator(result); | |
124 end | |
125 | |
126 --- Archive store API | |
127 | |
128 local map_store = {}; | |
129 map_store.__index = map_store; | |
130 function map_store:get(username, key) | |
131 local ok, result = engine:transaction(function() | |
132 if type(key) == "string" and key ~= "" then | |
133 for row in engine:select("SELECT `type`, `value` FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=? AND `key`=?", host, username or "", self.store, key) do | |
134 return deserialize(row[1], row[2]); | |
135 end | |
136 else | |
137 error("TODO: non-string keys"); | |
138 end | |
139 end); | |
140 if not ok then return nil, result; end | |
141 return result; | |
142 end | |
143 function map_store:set(username, key, data) | |
144 local ok, result = engine:transaction(function() | |
145 if type(key) == "string" and key ~= "" then | |
146 engine:delete("DELETE FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=? AND `key`=?", | |
147 host, username or "", self.store, key); | |
148 if data ~= nil then | |
149 local t, value = assert(serialize(data)); | |
150 engine:insert("INSERT INTO `prosody` (`host`,`user`,`store`,`key`,`type`,`value`) VALUES (?,?,?,?,?,?)", host, username or "", self.store, key, t, value); | |
151 end | |
152 else | |
153 error("TODO: non-string keys"); | |
154 end | |
155 return true; | |
156 end); | |
157 if not ok then return nil, result; end | |
158 return result; | |
159 end | |
160 | |
161 local archive_store = {} | |
162 archive_store.caps = { | |
163 total = true; | |
164 }; | |
165 archive_store.__index = archive_store | |
166 function archive_store:append(username, key, value, when, with) | |
167 if type(when) ~= "number" then | |
168 when, with, value = value, when, with; | |
169 end | |
170 local user,store = username,self.store; | |
171 return engine:transaction(function() | |
172 if key then | |
173 engine:delete("DELETE FROM `prosodyarchive` WHERE `host`=? AND `user`=? AND `store`=? AND `key`=?", host, user or "", store, key); | |
174 else | |
175 key = uuid.generate(); | |
176 end | |
177 local t, value = serialize(value); | |
178 engine:insert("INSERT INTO `prosodyarchive` (`host`, `user`, `store`, `when`, `with`, `key`, `type`, `value`) VALUES (?,?,?,?,?,?,?,?)", host, user or "", store, when, with, key, t, value); | |
179 return key; | |
180 end); | |
181 end | |
182 | |
183 -- Helpers for building the WHERE clause | |
184 local function archive_where(query, args, where) | |
185 -- Time range, inclusive | |
186 if query.start then | |
187 args[#args+1] = query.start | |
188 where[#where+1] = "`when` >= ?" | |
189 end | |
190 | |
191 if query["end"] then | |
192 args[#args+1] = query["end"]; | |
193 if query.start then | |
194 where[#where] = "`when` BETWEEN ? AND ?" -- is this inclusive? | |
195 else | |
196 where[#where+1] = "`when` <= ?" | |
197 end | |
198 end | |
199 | |
200 -- Related name | |
201 if query.with then | |
202 where[#where+1] = "`with` = ?"; | |
203 args[#args+1] = query.with | |
204 end | |
205 | |
206 -- Unique id | |
207 if query.key then | |
208 where[#where+1] = "`key` = ?"; | |
209 args[#args+1] = query.key | |
210 end | |
211 end | |
212 local function archive_where_id_range(query, args, where) | |
213 local args_len = #args | |
214 -- Before or after specific item, exclusive | |
215 if query.after then -- keys better be unique! | |
216 where[#where+1] = "`sort_id` > (SELECT `sort_id` FROM `prosodyarchive` WHERE `key` = ? AND `host` = ? AND `user` = ? AND `store` = ? LIMIT 1)" | |
217 args[args_len+1], args[args_len+2], args[args_len+3], args[args_len+4] = query.after, args[1], args[2], args[3]; | |
218 args_len = args_len + 4 | |
219 end | |
220 if query.before then | |
221 where[#where+1] = "`sort_id` < (SELECT `sort_id` FROM `prosodyarchive` WHERE `key` = ? AND `host` = ? AND `user` = ? AND `store` = ? LIMIT 1)" | |
222 args[args_len+1], args[args_len+2], args[args_len+3], args[args_len+4] = query.before, args[1], args[2], args[3]; | |
223 end | |
224 end | |
225 | |
226 function archive_store:find(username, query) | |
227 query = query or {}; | |
228 local user,store = username,self.store; | |
229 local total; | |
230 local ok, result = engine:transaction(function() | |
231 local sql_query = "SELECT `key`, `type`, `value`, `when`, `with` FROM `prosodyarchive` WHERE %s ORDER BY `sort_id` %s%s;"; | |
232 local args = { host, user or "", store, }; | |
233 local where = { "`host` = ?", "`user` = ?", "`store` = ?", }; | |
234 | |
235 archive_where(query, args, where); | |
236 | |
237 -- Total matching | |
238 if query.total then | |
239 local stats = engine:select("SELECT COUNT(*) FROM `prosodyarchive` WHERE " .. t_concat(where, " AND "), unpack(args)); | |
240 if stats then | |
241 local _total = stats() | |
242 total = _total and _total[1]; | |
243 end | |
244 if query.limit == 0 then -- Skip the real query | |
245 return noop, total; | |
246 end | |
247 end | |
248 | |
249 archive_where_id_range(query, args, where); | |
250 | |
251 if query.limit then | |
252 args[#args+1] = query.limit; | |
253 end | |
254 | |
255 sql_query = sql_query:format(t_concat(where, " AND "), query.reverse and "DESC" or "ASC", query.limit and " LIMIT ?" or ""); | |
256 module:log("debug", sql_query); | |
257 return engine:select(sql_query, unpack(args)); | |
258 end); | |
259 if not ok then return ok, result end | |
260 return function() | |
261 local row = result(); | |
262 if row ~= nil then | |
263 return row[1], deserialize(row[2], row[3]), row[4], row[5]; | |
264 end | |
265 end, total; | |
266 end | |
267 | |
268 function archive_store:delete(username, query) | |
269 query = query or {}; | |
270 local user,store = username,self.store; | |
271 return engine:transaction(function() | |
272 local sql_query = "DELETE FROM `prosodyarchive` WHERE %s;"; | |
273 local args = { host, user or "", store, }; | |
274 local where = { "`host` = ?", "`user` = ?", "`store` = ?", }; | |
275 if user == true then | |
276 table.remove(args, 2); | |
277 table.remove(where, 2); | |
278 end | |
279 archive_where(query, args, where); | |
280 archive_where_id_range(query, args, where); | |
281 sql_query = sql_query:format(t_concat(where, " AND ")); | |
282 module:log("debug", sql_query); | |
283 return engine:delete(sql_query, unpack(args)); | |
284 end); | |
285 end | |
286 | |
287 local stores = { | |
288 keyval = keyval_store; | |
289 map = map_store; | |
290 archive = archive_store; | |
291 }; | |
292 | |
293 --- Implement storage driver API | |
294 | |
295 -- FIXME: Some of these operations need to operate on the archive store(s) too | |
296 | |
297 local driver = {}; | |
298 | |
299 function driver:open(store, typ) | |
300 local store_mt = stores[typ or "keyval"]; | |
301 if store_mt then | |
302 return setmetatable({ store = store }, store_mt); | |
303 end | |
304 return nil, "unsupported-store"; | |
305 end | |
306 | |
307 function driver:stores(username) | |
308 local query = "SELECT DISTINCT `store` FROM `prosody` WHERE `host`=? AND `user`" .. | |
309 (username == true and "!=?" or "=?"); | |
310 if username == true or not username then | |
311 username = ""; | |
312 end | |
313 local ok, result = engine:transaction(function() | |
314 return engine:select(query, host, username); | |
315 end); | |
316 if not ok then return ok, result end | |
317 return iterator(result); | |
318 end | |
319 | |
320 function driver:purge(username) | |
321 return engine:transaction(function() | |
322 local stmt,err = engine:delete("DELETE FROM `prosody` WHERE `host`=? AND `user`=?", host, username); | |
323 return true, err; | |
324 end); | |
325 end | |
326 | |
327 --- Initialization | |
328 | |
329 | |
330 local function create_table(name) | |
331 local Table, Column, Index = sql.Table, sql.Column, sql.Index; | |
30 | 332 |
31 local ProsodyTable = Table { | 333 local ProsodyTable = Table { |
32 name="prosody"; | 334 name= name or "prosody"; |
33 Column { name="host", type="TEXT", nullable=false }; | 335 Column { name="host", type="TEXT", nullable=false }; |
34 Column { name="user", type="TEXT", nullable=false }; | 336 Column { name="user", type="TEXT", nullable=false }; |
35 Column { name="store", type="TEXT", nullable=false }; | 337 Column { name="store", type="TEXT", nullable=false }; |
36 Column { name="key", type="TEXT", nullable=false }; | 338 Column { name="key", type="TEXT", nullable=false }; |
37 Column { name="type", type="TEXT", nullable=false }; | 339 Column { name="type", type="TEXT", nullable=false }; |
58 engine:transaction(function() | 360 engine:transaction(function() |
59 ProsodyArchiveTable:create(engine); | 361 ProsodyArchiveTable:create(engine); |
60 end); | 362 end); |
61 end | 363 end |
62 | 364 |
63 local function upgrade_table() | 365 local function upgrade_table(params, apply_changes) |
366 local changes = false; | |
64 if params.driver == "MySQL" then | 367 if params.driver == "MySQL" then |
65 local success,err = engine:transaction(function() | 368 local success,err = engine:transaction(function() |
66 local result = engine:execute("SHOW COLUMNS FROM prosody WHERE Field='value' and Type='text'"); | 369 local result = engine:execute("SHOW COLUMNS FROM prosody WHERE Field='value' and Type='text'"); |
67 if result:rowcount() > 0 then | 370 if result:rowcount() > 0 then |
68 module:log("info", "Upgrading database schema..."); | 371 changes = true; |
69 engine:execute("ALTER TABLE prosody MODIFY COLUMN `value` MEDIUMTEXT"); | 372 if apply_changes then |
70 module:log("info", "Database table automatically upgraded"); | 373 module:log("info", "Upgrading database schema..."); |
374 engine:execute("ALTER TABLE prosody MODIFY COLUMN `value` MEDIUMTEXT"); | |
375 module:log("info", "Database table automatically upgraded"); | |
376 end | |
71 end | 377 end |
72 return true; | 378 return true; |
73 end); | 379 end); |
74 if not success then | 380 if not success then |
75 module:log("error", "Failed to check/upgrade database schema (%s), please see " | 381 module:log("error", "Failed to check/upgrade database schema (%s), please see " |
76 .."http://prosody.im/doc/mysql for help", | 382 .."http://prosody.im/doc/mysql for help", |
77 err or "unknown error"); | 383 err or "unknown error"); |
78 return false; | 384 return false; |
79 end | 385 end |
80 -- COMPAT w/pre-0.9: Upgrade tables to UTF-8 if not already | 386 |
387 -- COMPAT w/pre-0.10: Upgrade table to UTF-8 if not already | |
81 local check_encoding_query = "SELECT `COLUMN_NAME`,`COLUMN_TYPE` FROM `information_schema`.`columns` WHERE `TABLE_NAME`='prosody' AND ( `CHARACTER_SET_NAME`!='utf8' OR `COLLATION_NAME`!='utf8_bin' );"; | 388 local check_encoding_query = "SELECT `COLUMN_NAME`,`COLUMN_TYPE` FROM `information_schema`.`columns` WHERE `TABLE_NAME`='prosody' AND ( `CHARACTER_SET_NAME`!='utf8' OR `COLLATION_NAME`!='utf8_bin' );"; |
82 success,err = engine:transaction(function() | 389 success,err = engine:transaction(function() |
83 local result = engine:execute(check_encoding_query); | 390 local result = engine:execute(check_encoding_query); |
84 local n_bad_columns = result:rowcount(); | 391 local n_bad_columns = result:rowcount(); |
85 if n_bad_columns > 0 then | 392 if n_bad_columns > 0 then |
86 module:log("warn", "Found %d columns in prosody table requiring encoding change, updating now...", n_bad_columns); | 393 changes = true; |
87 local fix_column_query1 = "ALTER TABLE `prosody` CHANGE `%s` `%s` BLOB;"; | 394 if apply_changes then |
88 local fix_column_query2 = "ALTER TABLE `prosody` CHANGE `%s` `%s` %s CHARACTER SET 'utf8' COLLATE 'utf8_bin';"; | 395 module:log("warn", "Found %d columns in prosody table requiring encoding change, updating now...", n_bad_columns); |
89 for row in result:rows() do | 396 local fix_column_query1 = "ALTER TABLE `prosody` CHANGE `%s` `%s` BLOB;"; |
90 local column_name, column_type = unpack(row); | 397 local fix_column_query2 = "ALTER TABLE `prosody` CHANGE `%s` `%s` %s CHARACTER SET 'utf8' COLLATE 'utf8_bin';"; |
91 engine:execute(fix_column_query1:format(column_name, column_name)); | 398 for row in result:rows() do |
92 engine:execute(fix_column_query2:format(column_name, column_name, column_type)); | 399 local column_name, column_type = unpack(row); |
400 engine:execute(fix_column_query1:format(column_name, column_name)); | |
401 engine:execute(fix_column_query2:format(column_name, column_name, column_type)); | |
402 end | |
403 module:log("info", "Database encoding upgrade complete!"); | |
93 end | 404 end |
94 module:log("info", "Database encoding upgrade complete!"); | |
95 end | 405 end |
96 end); | 406 end); |
97 success,err = engine:transaction(function() return engine:execute(check_encoding_query); end); | 407 success,err = engine:transaction(function() return engine:execute(check_encoding_query); end); |
98 if not success then | 408 if not success then |
99 module:log("error", "Failed to check/upgrade database encoding: %s", err or "unknown error"); | 409 module:log("error", "Failed to check/upgrade database encoding: %s", err or "unknown error"); |
100 end | 410 return false; |
101 end | 411 end |
102 end | 412 end |
103 | 413 end |
104 do -- process options to get a db connection | 414 |
105 params = params or { driver = "SQLite3" }; | 415 local function normalize_params(params) |
106 | 416 assert(params.driver and params.database, "Configuration error: Both the SQL driver and the database need to be specified"); |
107 if params.driver == "SQLite3" then | 417 if params.driver == "SQLite3" then |
108 params.database = resolve_relative_path(prosody.paths.data or ".", params.database or "prosody.sqlite"); | 418 params.database = resolve_relative_path(prosody.paths.data or ".", params.database or "prosody.sqlite"); |
109 end | 419 end |
110 | 420 return params; |
111 assert(params.driver and params.database, "Both the SQL driver and the database need to be specified"); | 421 end |
112 | 422 |
113 --local dburi = db2uri(params); | 423 function module.load() |
114 engine = mod_sql:create_engine(params); | 424 if prosody.prosodyctl then return; end |
115 | 425 local params = normalize_params(module:get_option("sql", default_params)); |
116 if module:get_option("sql_manage_tables", true) then | 426 engine = sql:create_engine(params, function (engine) |
117 -- Automatically create table, ignore failure (table probably already exists) | 427 if module:get_option("sql_manage_tables", true) then |
118 create_table(); | 428 -- Automatically create table, ignore failure (table probably already exists) |
119 -- Encoding mess | 429 -- FIXME: we should check in information_schema, etc. |
120 upgrade_table(); | 430 create_table(); |
121 end | 431 -- Check whether the table needs upgrading |
122 end | 432 if not upgrade_table(params, true) then |
123 | 433 module:log("error", "Old database format detected, and upgrade failed"); |
124 local function serialize(value) | 434 return false, "database upgrade needed"; |
125 local t = type(value); | 435 end |
126 if t == "string" or t == "boolean" or t == "number" then | 436 end |
127 return t, tostring(value); | 437 end); |
128 elseif is_stanza(value) then | 438 |
129 return "xml", tostring(value); | 439 module:provides("storage", driver); |
130 elseif t == "table" then | 440 end |
131 local value,err = json.encode(value); | |
132 if value then return "json", value; end | |
133 return nil, err; | |
134 end | |
135 return nil, "Unhandled value type: "..t; | |
136 end | |
137 local function deserialize(t, value) | |
138 if t == "string" then return value; | |
139 elseif t == "boolean" then | |
140 if value == "true" then return true; | |
141 elseif value == "false" then return false; end | |
142 elseif t == "number" then return tonumber(value); | |
143 elseif t == "json" then | |
144 return json.decode(value); | |
145 elseif t == "xml" then | |
146 return xml_parse(value); | |
147 end | |
148 end | |
149 | |
150 local host = module.host; | |
151 local user, store; | |
152 | |
153 local function keyval_store_get() | |
154 local haveany; | |
155 local result = {}; | |
156 for row in engine:select("SELECT `key`,`type`,`value` FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=?", host, user or "", store) do | |
157 haveany = true; | |
158 local k = row[1]; | |
159 local v = deserialize(row[2], row[3]); | |
160 if k and v then | |
161 if k ~= "" then result[k] = v; elseif type(v) == "table" then | |
162 for a,b in pairs(v) do | |
163 result[a] = b; | |
164 end | |
165 end | |
166 end | |
167 end | |
168 if haveany then | |
169 return result; | |
170 end | |
171 end | |
172 local function keyval_store_set(data) | |
173 engine:delete("DELETE FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=?", host, user or "", store); | |
174 | |
175 if data and next(data) ~= nil then | |
176 local extradata = {}; | |
177 for key, value in pairs(data) do | |
178 if type(key) == "string" and key ~= "" then | |
179 local t, value = serialize(value); | |
180 assert(t, value); | |
181 engine:insert("INSERT INTO `prosody` (`host`,`user`,`store`,`key`,`type`,`value`) VALUES (?,?,?,?,?,?)", host, user or "", store, key, t, value); | |
182 else | |
183 extradata[key] = value; | |
184 end | |
185 end | |
186 if next(extradata) ~= nil then | |
187 local t, extradata = serialize(extradata); | |
188 assert(t, extradata); | |
189 engine:insert("INSERT INTO `prosody` (`host`,`user`,`store`,`key`,`type`,`value`) VALUES (?,?,?,?,?,?)", host, user or "", store, "", t, extradata); | |
190 end | |
191 end | |
192 return true; | |
193 end | |
194 | |
195 local keyval_store = {}; | |
196 keyval_store.__index = keyval_store; | |
197 function keyval_store:get(username) | |
198 user,store = username,self.store; | |
199 local ok, result = engine:transaction(keyval_store_get); | |
200 if not ok then return ok, result; end | |
201 return result; | |
202 end | |
203 function keyval_store:set(username, data) | |
204 user,store = username,self.store; | |
205 return engine:transaction(function() | |
206 return keyval_store_set(data); | |
207 end); | |
208 end | |
209 function keyval_store:users() | |
210 local ok, result = engine:transaction(function() | |
211 return engine:select("SELECT DISTINCT `user` FROM `prosody` WHERE `host`=? AND `store`=?", host, self.store); | |
212 end); | |
213 if not ok then return ok, result end | |
214 return iterator(result); | |
215 end | |
216 | |
217 local map_store = {}; | |
218 map_store.__index = map_store; | |
219 function map_store:get(username, key) | |
220 local ok, result = engine:transaction(function() | |
221 if type(key) == "string" and key ~= "" then | |
222 for row in engine:select("SELECT `type`, `value` FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=? AND `key`=?", host, username or "", self.store, key) do | |
223 return deserialize(row[1], row[2]); | |
224 end | |
225 else | |
226 error("TODO: non-string keys"); | |
227 end | |
228 end); | |
229 if not ok then return nil, result; end | |
230 return result; | |
231 end | |
232 function map_store:set(username, key, data) | |
233 local ok, result = engine:transaction(function() | |
234 if type(key) == "string" and key ~= "" then | |
235 engine:delete("DELETE FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=? AND `key`=?", | |
236 host, username or "", self.store, key); | |
237 if data ~= nil then | |
238 local t, value = assert(serialize(data)); | |
239 engine:insert("INSERT INTO `prosody` (`host`,`user`,`store`,`key`,`type`,`value`) VALUES (?,?,?,?,?,?)", host, username or "", self.store, key, t, value); | |
240 end | |
241 else | |
242 error("TODO: non-string keys"); | |
243 end | |
244 return true; | |
245 end); | |
246 if not ok then return nil, result; end | |
247 return result; | |
248 end | |
249 | |
250 local archive_store = {} | |
251 archive_store.caps = { | |
252 total = true; | |
253 }; | |
254 archive_store.__index = archive_store | |
255 function archive_store:append(username, key, value, when, with) | |
256 if type(when) ~= "number" then | |
257 when, with, value = value, when, with; | |
258 end | |
259 local user,store = username,self.store; | |
260 return engine:transaction(function() | |
261 if key then | |
262 engine:delete("DELETE FROM `prosodyarchive` WHERE `host`=? AND `user`=? AND `store`=? AND `key`=?", host, user or "", store, key); | |
263 else | |
264 key = uuid.generate(); | |
265 end | |
266 local t, value = serialize(value); | |
267 engine:insert("INSERT INTO `prosodyarchive` (`host`, `user`, `store`, `when`, `with`, `key`, `type`, `value`) VALUES (?,?,?,?,?,?,?,?)", host, user or "", store, when, with, key, t, value); | |
268 return key; | |
269 end); | |
270 end | |
271 | |
272 -- Helpers for building the WHERE clause | |
273 local function archive_where(query, args, where) | |
274 -- Time range, inclusive | |
275 if query.start then | |
276 args[#args+1] = query.start | |
277 where[#where+1] = "`when` >= ?" | |
278 end | |
279 | |
280 if query["end"] then | |
281 args[#args+1] = query["end"]; | |
282 if query.start then | |
283 where[#where] = "`when` BETWEEN ? AND ?" -- is this inclusive? | |
284 else | |
285 where[#where+1] = "`when` <= ?" | |
286 end | |
287 end | |
288 | |
289 -- Related name | |
290 if query.with then | |
291 where[#where+1] = "`with` = ?"; | |
292 args[#args+1] = query.with | |
293 end | |
294 | |
295 -- Unique id | |
296 if query.key then | |
297 where[#where+1] = "`key` = ?"; | |
298 args[#args+1] = query.key | |
299 end | |
300 end | |
301 local function archive_where_id_range(query, args, where) | |
302 local args_len = #args | |
303 -- Before or after specific item, exclusive | |
304 if query.after then -- keys better be unique! | |
305 where[#where+1] = "`sort_id` > (SELECT `sort_id` FROM `prosodyarchive` WHERE `key` = ? AND `host` = ? AND `user` = ? AND `store` = ? LIMIT 1)" | |
306 args[args_len+1], args[args_len+2], args[args_len+3], args[args_len+4] = query.after, args[1], args[2], args[3]; | |
307 args_len = args_len + 4 | |
308 end | |
309 if query.before then | |
310 where[#where+1] = "`sort_id` < (SELECT `sort_id` FROM `prosodyarchive` WHERE `key` = ? AND `host` = ? AND `user` = ? AND `store` = ? LIMIT 1)" | |
311 args[args_len+1], args[args_len+2], args[args_len+3], args[args_len+4] = query.before, args[1], args[2], args[3]; | |
312 end | |
313 end | |
314 | |
315 function archive_store:find(username, query) | |
316 query = query or {}; | |
317 local user,store = username,self.store; | |
318 local total; | |
319 local ok, result = engine:transaction(function() | |
320 local sql_query = "SELECT `key`, `type`, `value`, `when`, `with` FROM `prosodyarchive` WHERE %s ORDER BY `sort_id` %s%s;"; | |
321 local args = { host, user or "", store, }; | |
322 local where = { "`host` = ?", "`user` = ?", "`store` = ?", }; | |
323 | |
324 archive_where(query, args, where); | |
325 | |
326 -- Total matching | |
327 if query.total then | |
328 local stats = engine:select("SELECT COUNT(*) FROM `prosodyarchive` WHERE " .. t_concat(where, " AND "), unpack(args)); | |
329 if stats then | |
330 local _total = stats() | |
331 total = _total and _total[1]; | |
332 end | |
333 if query.limit == 0 then -- Skip the real query | |
334 return noop, total; | |
335 end | |
336 end | |
337 | |
338 archive_where_id_range(query, args, where); | |
339 | |
340 if query.limit then | |
341 args[#args+1] = query.limit; | |
342 end | |
343 | |
344 sql_query = sql_query:format(t_concat(where, " AND "), query.reverse and "DESC" or "ASC", query.limit and " LIMIT ?" or ""); | |
345 module:log("debug", sql_query); | |
346 return engine:select(sql_query, unpack(args)); | |
347 end); | |
348 if not ok then return ok, result end | |
349 return function() | |
350 local row = result(); | |
351 if row ~= nil then | |
352 return row[1], deserialize(row[2], row[3]), row[4], row[5]; | |
353 end | |
354 end, total; | |
355 end | |
356 | |
357 function archive_store:delete(username, query) | |
358 query = query or {}; | |
359 local user,store = username,self.store; | |
360 return engine:transaction(function() | |
361 local sql_query = "DELETE FROM `prosodyarchive` WHERE %s;"; | |
362 local args = { host, user or "", store, }; | |
363 local where = { "`host` = ?", "`user` = ?", "`store` = ?", }; | |
364 if user == true then | |
365 table.remove(args, 2); | |
366 table.remove(where, 2); | |
367 end | |
368 archive_where(query, args, where); | |
369 archive_where_id_range(query, args, where); | |
370 sql_query = sql_query:format(t_concat(where, " AND ")); | |
371 module:log("debug", sql_query); | |
372 return engine:delete(sql_query, unpack(args)); | |
373 end); | |
374 end | |
375 | |
376 local stores = { | |
377 keyval = keyval_store; | |
378 map = map_store; | |
379 archive = archive_store; | |
380 }; | |
381 | |
382 local driver = {}; | |
383 | |
384 function driver:open(store, typ) | |
385 local store_mt = stores[typ or "keyval"]; | |
386 if store_mt then | |
387 return setmetatable({ store = store }, store_mt); | |
388 end | |
389 return nil, "unsupported-store"; | |
390 end | |
391 | |
392 function driver:stores(username) | |
393 local sql = "SELECT DISTINCT `store` FROM `prosody` WHERE `host`=? AND `user`" .. | |
394 (username == true and "!=?" or "=?"); | |
395 if username == true or not username then | |
396 username = ""; | |
397 end | |
398 local ok, result = engine:transaction(function() | |
399 return engine:select(sql, host, username); | |
400 end); | |
401 if not ok then return ok, result end | |
402 return iterator(result); | |
403 end | |
404 | |
405 function driver:purge(username) | |
406 return engine:transaction(function() | |
407 local stmt,err = engine:delete("DELETE FROM `prosody` WHERE `host`=? AND `user`=?", host, username); | |
408 return true,err; | |
409 end); | |
410 end | |
411 | |
412 module:provides("storage", driver); | |
413 | |
414 |