Software /
code /
prosody
Comparison
plugins/mod_storage_sql2.lua @ 6739:6216743c188c
mod_storage_sql2: Break up monolithic code into functions, theoretically no functionality changes.
author | Matthew Wild <mwild1@gmail.com> |
---|---|
date | Wed, 24 Jun 2015 23:24:32 +0100 |
parent | 6738:845bc5ba306d |
child | 6740:99b3f29c3c71 |
comparison
equal
deleted
inserted
replaced
6738:845bc5ba306d | 6739:6216743c188c |
---|---|
1 | 1 |
2 local json = require "util.json"; | 2 local json = require "util.json"; |
3 local sql = require "util.sql"; | |
3 local xml_parse = require "util.xml".parse; | 4 local xml_parse = require "util.xml".parse; |
4 local uuid = require "util.uuid"; | 5 local uuid = require "util.uuid"; |
5 local resolve_relative_path = require "util.paths".resolve_relative_path; | 6 local resolve_relative_path = require "util.paths".resolve_relative_path; |
6 | 7 |
7 local stanza_mt = require"util.stanza".stanza_mt; | 8 local stanza_mt = require"util.stanza".stanza_mt; |
18 return unpack(row); | 19 return unpack(row); |
19 end | 20 end |
20 end, result, nil; | 21 end, result, nil; |
21 end | 22 end |
22 | 23 |
23 local mod_sql = module:require("sql"); | 24 local default_params = { driver = "SQLite3" }; |
24 local params = module:get_option("sql"); | 25 |
25 | 26 local engine; |
26 local engine; -- TODO create engine | 27 |
27 | 28 local function serialize(value) |
28 local function create_table() | 29 local t = type(value); |
29 local Table,Column,Index = mod_sql.Table,mod_sql.Column,mod_sql.Index; | 30 if t == "string" or t == "boolean" or t == "number" then |
31 return t, tostring(value); | |
32 elseif is_stanza(value) then | |
33 return "xml", tostring(value); | |
34 elseif t == "table" then | |
35 local value,err = json.encode(value); | |
36 if value then return "json", value; end | |
37 return nil, err; | |
38 end | |
39 return nil, "Unhandled value type: "..t; | |
40 end | |
41 local function deserialize(t, value) | |
42 if t == "string" then return value; | |
43 elseif t == "boolean" then | |
44 if value == "true" then return true; | |
45 elseif value == "false" then return false; end | |
46 elseif t == "number" then return tonumber(value); | |
47 elseif t == "json" then | |
48 return json.decode(value); | |
49 elseif t == "xml" then | |
50 return xml_parse(value); | |
51 end | |
52 end | |
53 | |
54 local host = module.host; | |
55 local user, store; | |
56 | |
57 local function keyval_store_get() | |
58 local haveany; | |
59 local result = {}; | |
60 for row in engine:select("SELECT `key`,`type`,`value` FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=?", host, user or "", store) do | |
61 haveany = true; | |
62 local k = row[1]; | |
63 local v = deserialize(row[2], row[3]); | |
64 if k and v then | |
65 if k ~= "" then result[k] = v; elseif type(v) == "table" then | |
66 for a,b in pairs(v) do | |
67 result[a] = b; | |
68 end | |
69 end | |
70 end | |
71 end | |
72 if haveany then | |
73 return result; | |
74 end | |
75 end | |
76 local function keyval_store_set(data) | |
77 engine:delete("DELETE FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=?", host, user or "", store); | |
78 | |
79 if data and next(data) ~= nil then | |
80 local extradata = {}; | |
81 for key, value in pairs(data) do | |
82 if type(key) == "string" and key ~= "" then | |
83 local t, value = serialize(value); | |
84 assert(t, value); | |
85 engine:insert("INSERT INTO `prosody` (`host`,`user`,`store`,`key`,`type`,`value`) VALUES (?,?,?,?,?,?)", host, user or "", store, key, t, value); | |
86 else | |
87 extradata[key] = value; | |
88 end | |
89 end | |
90 if next(extradata) ~= nil then | |
91 local t, extradata = serialize(extradata); | |
92 assert(t, extradata); | |
93 engine:insert("INSERT INTO `prosody` (`host`,`user`,`store`,`key`,`type`,`value`) VALUES (?,?,?,?,?,?)", host, user or "", store, "", t, extradata); | |
94 end | |
95 end | |
96 return true; | |
97 end | |
98 | |
99 --- Key/value store API (default store type) | |
100 | |
101 local keyval_store = {}; | |
102 keyval_store.__index = keyval_store; | |
103 function keyval_store:get(username) | |
104 user, store = username, self.store; | |
105 local ok, result = engine:transaction(keyval_store_get); | |
106 if not ok then | |
107 module:log("error", "Unable to read from database %s store for %s: %s", store, username or "<host>", result); | |
108 return nil, result; | |
109 end | |
110 return result; | |
111 end | |
112 function keyval_store:set(username, data) | |
113 user,store = username,self.store; | |
114 return engine:transaction(function() | |
115 return keyval_store_set(data); | |
116 end); | |
117 end | |
118 function keyval_store:users() | |
119 local ok, result = engine:transaction(function() | |
120 return engine:select("SELECT DISTINCT `user` FROM `prosody` WHERE `host`=? AND `store`=?", host, self.store); | |
121 end); | |
122 if not ok then return ok, result end | |
123 return iterator(result); | |
124 end | |
125 | |
126 --- Archive store API | |
127 | |
128 local archive_store = {} | |
129 archive_store.__index = archive_store | |
130 function archive_store:append(username, key, when, with, value) | |
131 if value == nil then -- COMPAT early versions | |
132 when, with, value, key = key, when, with, value | |
133 end | |
134 local user,store = username,self.store; | |
135 return engine:transaction(function() | |
136 if key then | |
137 engine:delete("DELETE FROM `prosodyarchive` WHERE `host`=? AND `user`=? AND `store`=? AND `key`=?", host, user or "", store, key); | |
138 else | |
139 key = uuid.generate(); | |
140 end | |
141 local t, value = serialize(value); | |
142 engine:insert("INSERT INTO `prosodyarchive` (`host`, `user`, `store`, `when`, `with`, `key`, `type`, `value`) VALUES (?,?,?,?,?,?,?,?)", host, user or "", store, when, with, key, t, value); | |
143 return key; | |
144 end); | |
145 end | |
146 | |
147 -- Helpers for building the WHERE clause | |
148 local function archive_where(query, args, where) | |
149 -- Time range, inclusive | |
150 if query.start then | |
151 args[#args+1] = query.start | |
152 where[#where+1] = "`when` >= ?" | |
153 end | |
154 | |
155 if query["end"] then | |
156 args[#args+1] = query["end"]; | |
157 if query.start then | |
158 where[#where] = "`when` BETWEEN ? AND ?" -- is this inclusive? | |
159 else | |
160 where[#where+1] = "`when` <= ?" | |
161 end | |
162 end | |
163 | |
164 -- Related name | |
165 if query.with then | |
166 where[#where+1] = "`with` = ?"; | |
167 args[#args+1] = query.with | |
168 end | |
169 | |
170 -- Unique id | |
171 if query.key then | |
172 where[#where+1] = "`key` = ?"; | |
173 args[#args+1] = query.key | |
174 end | |
175 end | |
176 local function archive_where_id_range(query, args, where) | |
177 local args_len = #args | |
178 -- Before or after specific item, exclusive | |
179 if query.after then -- keys better be unique! | |
180 where[#where+1] = "`sort_id` > (SELECT `sort_id` FROM `prosodyarchive` WHERE `key` = ? AND `host` = ? AND `user` = ? AND `store` = ? LIMIT 1)" | |
181 args[args_len+1], args[args_len+2], args[args_len+3], args[args_len+4] = query.after, args[1], args[2], args[3]; | |
182 args_len = args_len + 4 | |
183 end | |
184 if query.before then | |
185 where[#where+1] = "`sort_id` < (SELECT `sort_id` FROM `prosodyarchive` WHERE `key` = ? AND `host` = ? AND `user` = ? AND `store` = ? LIMIT 1)" | |
186 args[args_len+1], args[args_len+2], args[args_len+3], args[args_len+4] = query.before, args[1], args[2], args[3]; | |
187 end | |
188 end | |
189 | |
190 function archive_store:find(username, query) | |
191 query = query or {}; | |
192 local user,store = username,self.store; | |
193 local total; | |
194 local ok, result = engine:transaction(function() | |
195 local sql_query = "SELECT `key`, `type`, `value`, `when` FROM `prosodyarchive` WHERE %s ORDER BY `sort_id` %s%s;"; | |
196 local args = { host, user or "", store, }; | |
197 local where = { "`host` = ?", "`user` = ?", "`store` = ?", }; | |
198 | |
199 archive_where(query, args, where); | |
200 | |
201 -- Total matching | |
202 if query.total then | |
203 local stats = engine:select("SELECT COUNT(*) FROM `prosodyarchive` WHERE " .. t_concat(where, " AND "), unpack(args)); | |
204 if stats then | |
205 local _total = stats() | |
206 total = _total and _total[1]; | |
207 end | |
208 if query.limit == 0 then -- Skip the real query | |
209 return noop, total; | |
210 end | |
211 end | |
212 | |
213 archive_where_id_range(query, args, where); | |
214 | |
215 if query.limit then | |
216 args[#args+1] = query.limit; | |
217 end | |
218 | |
219 sql_query = sql_query:format(t_concat(where, " AND "), query.reverse and "DESC" or "ASC", query.limit and " LIMIT ?" or ""); | |
220 module:log("debug", sql_query); | |
221 return engine:select(sql_query, unpack(args)); | |
222 end); | |
223 if not ok then return ok, result end | |
224 return function() | |
225 local row = result(); | |
226 if row ~= nil then | |
227 return row[1], deserialize(row[2], row[3]), row[4]; | |
228 end | |
229 end, total; | |
230 end | |
231 | |
232 function archive_store:delete(username, query) | |
233 query = query or {}; | |
234 local user,store = username,self.store; | |
235 return engine:transaction(function() | |
236 local sql_query = "DELETE FROM `prosodyarchive` WHERE %s;"; | |
237 local args = { host, user or "", store, }; | |
238 local where = { "`host` = ?", "`user` = ?", "`store` = ?", }; | |
239 if user == true then | |
240 table.remove(args, 2); | |
241 table.remove(where, 2); | |
242 end | |
243 archive_where(query, args, where); | |
244 archive_where_id_range(query, args, where); | |
245 sql_query = sql_query:format(t_concat(where, " AND ")); | |
246 module:log("debug", sql_query); | |
247 return engine:delete(sql_query, unpack(args)); | |
248 end); | |
249 end | |
250 | |
251 local stores = { | |
252 keyval = keyval_store; | |
253 archive = archive_store; | |
254 }; | |
255 | |
256 --- Implement storage driver API | |
257 | |
258 -- FIXME: Some of these operations need to operate on the archive store(s) too | |
259 | |
260 local driver = {}; | |
261 | |
262 function driver:open(store, typ) | |
263 local store_mt = stores[typ or "keyval"]; | |
264 if store_mt then | |
265 return setmetatable({ store = store }, store_mt); | |
266 end | |
267 return nil, "unsupported-store"; | |
268 end | |
269 | |
270 function driver:stores(username) | |
271 local query = "SELECT DISTINCT `store` FROM `prosody` WHERE `host`=? AND `user`" .. | |
272 (username == true and "!=?" or "=?"); | |
273 if username == true or not username then | |
274 username = ""; | |
275 end | |
276 local ok, result = engine:transaction(function() | |
277 return engine:select(query, host, username); | |
278 end); | |
279 if not ok then return ok, result end | |
280 return iterator(result); | |
281 end | |
282 | |
283 function driver:purge(username) | |
284 return engine:transaction(function() | |
285 local stmt,err = engine:delete("DELETE FROM `prosody` WHERE `host`=? AND `user`=?", host, username); | |
286 return true, err; | |
287 end); | |
288 end | |
289 | |
290 --- Initialization | |
291 | |
292 | |
293 local function create_table(name) | |
294 local Table, Column, Index = sql.Table, sql.Column, sql.Index; | |
30 | 295 |
31 local ProsodyTable = Table { | 296 local ProsodyTable = Table { |
32 name="prosody"; | 297 name= name or "prosody"; |
33 Column { name="host", type="TEXT", nullable=false }; | 298 Column { name="host", type="TEXT", nullable=false }; |
34 Column { name="user", type="TEXT", nullable=false }; | 299 Column { name="user", type="TEXT", nullable=false }; |
35 Column { name="store", type="TEXT", nullable=false }; | 300 Column { name="store", type="TEXT", nullable=false }; |
36 Column { name="key", type="TEXT", nullable=false }; | 301 Column { name="key", type="TEXT", nullable=false }; |
37 Column { name="type", type="TEXT", nullable=false }; | 302 Column { name="type", type="TEXT", nullable=false }; |
58 engine:transaction(function() | 323 engine:transaction(function() |
59 ProsodyArchiveTable:create(engine); | 324 ProsodyArchiveTable:create(engine); |
60 end); | 325 end); |
61 end | 326 end |
62 | 327 |
63 local function upgrade_table() | 328 local function upgrade_table(params, apply_changes) |
329 local changes = false; | |
64 if params.driver == "MySQL" then | 330 if params.driver == "MySQL" then |
65 local success,err = engine:transaction(function() | 331 local success,err = engine:transaction(function() |
66 local result = engine:execute("SHOW COLUMNS FROM prosody WHERE Field='value' and Type='text'"); | 332 local result = engine:execute("SHOW COLUMNS FROM prosody WHERE Field='value' and Type='text'"); |
67 if result:rowcount() > 0 then | 333 if result:rowcount() > 0 then |
68 module:log("info", "Upgrading database schema..."); | 334 changes = true; |
69 engine:execute("ALTER TABLE prosody MODIFY COLUMN `value` MEDIUMTEXT"); | 335 if apply_changes then |
70 module:log("info", "Database table automatically upgraded"); | 336 module:log("info", "Upgrading database schema..."); |
337 engine:execute("ALTER TABLE prosody MODIFY COLUMN `value` MEDIUMTEXT"); | |
338 module:log("info", "Database table automatically upgraded"); | |
339 end | |
71 end | 340 end |
72 return true; | 341 return true; |
73 end); | 342 end); |
74 if not success then | 343 if not success then |
75 module:log("error", "Failed to check/upgrade database schema (%s), please see " | 344 module:log("error", "Failed to check/upgrade database schema (%s), please see " |
76 .."http://prosody.im/doc/mysql for help", | 345 .."http://prosody.im/doc/mysql for help", |
77 err or "unknown error"); | 346 err or "unknown error"); |
78 return false; | 347 return false; |
79 end | 348 end |
80 -- COMPAT w/pre-0.9: Upgrade tables to UTF-8 if not already | 349 |
350 -- COMPAT w/pre-0.10: Upgrade table to UTF-8 if not already | |
81 local check_encoding_query = "SELECT `COLUMN_NAME`,`COLUMN_TYPE` FROM `information_schema`.`columns` WHERE `TABLE_NAME`='prosody' AND ( `CHARACTER_SET_NAME`!='utf8' OR `COLLATION_NAME`!='utf8_bin' );"; | 351 local check_encoding_query = "SELECT `COLUMN_NAME`,`COLUMN_TYPE` FROM `information_schema`.`columns` WHERE `TABLE_NAME`='prosody' AND ( `CHARACTER_SET_NAME`!='utf8' OR `COLLATION_NAME`!='utf8_bin' );"; |
82 success,err = engine:transaction(function() | 352 success,err = engine:transaction(function() |
83 local result = engine:execute(check_encoding_query); | 353 local result = engine:execute(check_encoding_query); |
84 local n_bad_columns = result:rowcount(); | 354 local n_bad_columns = result:rowcount(); |
85 if n_bad_columns > 0 then | 355 if n_bad_columns > 0 then |
86 module:log("warn", "Found %d columns in prosody table requiring encoding change, updating now...", n_bad_columns); | 356 changes = true; |
87 local fix_column_query1 = "ALTER TABLE `prosody` CHANGE `%s` `%s` BLOB;"; | 357 if apply_changes then |
88 local fix_column_query2 = "ALTER TABLE `prosody` CHANGE `%s` `%s` %s CHARACTER SET 'utf8' COLLATE 'utf8_bin';"; | 358 module:log("warn", "Found %d columns in prosody table requiring encoding change, updating now...", n_bad_columns); |
89 for row in result:rows() do | 359 local fix_column_query1 = "ALTER TABLE `prosody` CHANGE `%s` `%s` BLOB;"; |
90 local column_name, column_type = unpack(row); | 360 local fix_column_query2 = "ALTER TABLE `prosody` CHANGE `%s` `%s` %s CHARACTER SET 'utf8' COLLATE 'utf8_bin';"; |
91 engine:execute(fix_column_query1:format(column_name, column_name)); | 361 for row in result:rows() do |
92 engine:execute(fix_column_query2:format(column_name, column_name, column_type)); | 362 local column_name, column_type = unpack(row); |
363 engine:execute(fix_column_query1:format(column_name, column_name)); | |
364 engine:execute(fix_column_query2:format(column_name, column_name, column_type)); | |
365 end | |
366 module:log("info", "Database encoding upgrade complete!"); | |
93 end | 367 end |
94 module:log("info", "Database encoding upgrade complete!"); | |
95 end | 368 end |
96 end); | 369 end); |
97 success,err = engine:transaction(function() return engine:execute(check_encoding_query); end); | 370 success,err = engine:transaction(function() return engine:execute(check_encoding_query); end); |
98 if not success then | 371 if not success then |
99 module:log("error", "Failed to check/upgrade database encoding: %s", err or "unknown error"); | 372 module:log("error", "Failed to check/upgrade database encoding: %s", err or "unknown error"); |
100 end | 373 return false; |
101 end | 374 end |
102 end | 375 end |
103 | 376 end |
104 do -- process options to get a db connection | 377 |
105 params = params or { driver = "SQLite3" }; | 378 local function normalize_params(params) |
106 | 379 assert(params.driver and params.database, "Configuration error: Both the SQL driver and the database need to be specified"); |
107 if params.driver == "SQLite3" then | 380 if params.driver == "SQLite3" then |
108 params.database = resolve_relative_path(prosody.paths.data or ".", params.database or "prosody.sqlite"); | 381 params.database = resolve_relative_path(prosody.paths.data or ".", params.database or "prosody.sqlite"); |
109 end | 382 end |
110 | 383 return params; |
111 assert(params.driver and params.database, "Both the SQL driver and the database need to be specified"); | 384 end |
112 | 385 |
113 --local dburi = db2uri(params); | 386 function module.load() |
114 engine = mod_sql:create_engine(params); | 387 if prosody.prosodyctl then return; end |
115 | 388 local params = normalize_params(module:get_option("sql", default_params)); |
116 if module:get_option("sql_manage_tables", true) then | 389 engine = sql:create_engine(params, function (engine) |
117 -- Automatically create table, ignore failure (table probably already exists) | 390 if module:get_option("sql_manage_tables", true) then |
118 create_table(); | 391 -- Automatically create table, ignore failure (table probably already exists) |
119 -- Encoding mess | 392 -- FIXME: we should check in information_schema, etc. |
120 upgrade_table(); | 393 create_table(); |
121 end | 394 -- Check whether the table needs upgrading |
122 end | 395 if not upgrade_table(params, true) then |
123 | 396 module:log("error", "Old database format detected, and upgrade failed"); |
124 local function serialize(value) | 397 return false, "database upgrade needed"; |
125 local t = type(value); | 398 end |
126 if t == "string" or t == "boolean" or t == "number" then | 399 end |
127 return t, tostring(value); | 400 end); |
128 elseif is_stanza(value) then | 401 |
129 return "xml", tostring(value); | 402 module:provides("storage", driver); |
130 elseif t == "table" then | 403 end |
131 local value,err = json.encode(value); | |
132 if value then return "json", value; end | |
133 return nil, err; | |
134 end | |
135 return nil, "Unhandled value type: "..t; | |
136 end | |
137 local function deserialize(t, value) | |
138 if t == "string" then return value; | |
139 elseif t == "boolean" then | |
140 if value == "true" then return true; | |
141 elseif value == "false" then return false; end | |
142 elseif t == "number" then return tonumber(value); | |
143 elseif t == "json" then | |
144 return json.decode(value); | |
145 elseif t == "xml" then | |
146 return xml_parse(value); | |
147 end | |
148 end | |
149 | |
150 local host = module.host; | |
151 local user, store; | |
152 | |
153 local function keyval_store_get() | |
154 local haveany; | |
155 local result = {}; | |
156 for row in engine:select("SELECT `key`,`type`,`value` FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=?", host, user or "", store) do | |
157 haveany = true; | |
158 local k = row[1]; | |
159 local v = deserialize(row[2], row[3]); | |
160 if k and v then | |
161 if k ~= "" then result[k] = v; elseif type(v) == "table" then | |
162 for a,b in pairs(v) do | |
163 result[a] = b; | |
164 end | |
165 end | |
166 end | |
167 end | |
168 if haveany then | |
169 return result; | |
170 end | |
171 end | |
172 local function keyval_store_set(data) | |
173 engine:delete("DELETE FROM `prosody` WHERE `host`=? AND `user`=? AND `store`=?", host, user or "", store); | |
174 | |
175 if data and next(data) ~= nil then | |
176 local extradata = {}; | |
177 for key, value in pairs(data) do | |
178 if type(key) == "string" and key ~= "" then | |
179 local t, value = serialize(value); | |
180 assert(t, value); | |
181 engine:insert("INSERT INTO `prosody` (`host`,`user`,`store`,`key`,`type`,`value`) VALUES (?,?,?,?,?,?)", host, user or "", store, key, t, value); | |
182 else | |
183 extradata[key] = value; | |
184 end | |
185 end | |
186 if next(extradata) ~= nil then | |
187 local t, extradata = serialize(extradata); | |
188 assert(t, extradata); | |
189 engine:insert("INSERT INTO `prosody` (`host`,`user`,`store`,`key`,`type`,`value`) VALUES (?,?,?,?,?,?)", host, user or "", store, "", t, extradata); | |
190 end | |
191 end | |
192 return true; | |
193 end | |
194 | |
195 --- Key/value store API (default store type) | |
196 | |
197 local keyval_store = {}; | |
198 keyval_store.__index = keyval_store; | |
199 function keyval_store:get(username) | |
200 user, store = username, self.store; | |
201 local ok, result = engine:transaction(keyval_store_get); | |
202 if not ok then | |
203 module:log("error", "Unable to read from database %s store for %s: %s", store, username or "<host>", result); | |
204 return nil, result; | |
205 end | |
206 return result; | |
207 end | |
208 function keyval_store:set(username, data) | |
209 user,store = username,self.store; | |
210 return engine:transaction(function() | |
211 return keyval_store_set(data); | |
212 end); | |
213 end | |
214 function keyval_store:users() | |
215 local ok, result = engine:transaction(function() | |
216 return engine:select("SELECT DISTINCT `user` FROM `prosody` WHERE `host`=? AND `store`=?", host, self.store); | |
217 end); | |
218 if not ok then return ok, result end | |
219 return iterator(result); | |
220 end | |
221 | |
222 --- Archive store API | |
223 | |
224 local archive_store = {} | |
225 archive_store.__index = archive_store | |
226 function archive_store:append(username, key, when, with, value) | |
227 if value == nil then -- COMPAT early versions | |
228 when, with, value, key = key, when, with, value | |
229 end | |
230 local user,store = username,self.store; | |
231 return engine:transaction(function() | |
232 if key then | |
233 engine:delete("DELETE FROM `prosodyarchive` WHERE `host`=? AND `user`=? AND `store`=? AND `key`=?", host, user or "", store, key); | |
234 else | |
235 key = uuid.generate(); | |
236 end | |
237 local t, value = serialize(value); | |
238 engine:insert("INSERT INTO `prosodyarchive` (`host`, `user`, `store`, `when`, `with`, `key`, `type`, `value`) VALUES (?,?,?,?,?,?,?,?)", host, user or "", store, when, with, key, t, value); | |
239 return key; | |
240 end); | |
241 end | |
242 | |
243 -- Helpers for building the WHERE clause | |
244 local function archive_where(query, args, where) | |
245 -- Time range, inclusive | |
246 if query.start then | |
247 args[#args+1] = query.start | |
248 where[#where+1] = "`when` >= ?" | |
249 end | |
250 | |
251 if query["end"] then | |
252 args[#args+1] = query["end"]; | |
253 if query.start then | |
254 where[#where] = "`when` BETWEEN ? AND ?" -- is this inclusive? | |
255 else | |
256 where[#where+1] = "`when` <= ?" | |
257 end | |
258 end | |
259 | |
260 -- Related name | |
261 if query.with then | |
262 where[#where+1] = "`with` = ?"; | |
263 args[#args+1] = query.with | |
264 end | |
265 | |
266 -- Unique id | |
267 if query.key then | |
268 where[#where+1] = "`key` = ?"; | |
269 args[#args+1] = query.key | |
270 end | |
271 end | |
272 local function archive_where_id_range(query, args, where) | |
273 local args_len = #args | |
274 -- Before or after specific item, exclusive | |
275 if query.after then -- keys better be unique! | |
276 where[#where+1] = "`sort_id` > (SELECT `sort_id` FROM `prosodyarchive` WHERE `key` = ? AND `host` = ? AND `user` = ? AND `store` = ? LIMIT 1)" | |
277 args[args_len+1], args[args_len+2], args[args_len+3], args[args_len+4] = query.after, args[1], args[2], args[3]; | |
278 args_len = args_len + 4 | |
279 end | |
280 if query.before then | |
281 where[#where+1] = "`sort_id` < (SELECT `sort_id` FROM `prosodyarchive` WHERE `key` = ? AND `host` = ? AND `user` = ? AND `store` = ? LIMIT 1)" | |
282 args[args_len+1], args[args_len+2], args[args_len+3], args[args_len+4] = query.before, args[1], args[2], args[3]; | |
283 end | |
284 end | |
285 | |
286 function archive_store:find(username, query) | |
287 query = query or {}; | |
288 local user,store = username,self.store; | |
289 local total; | |
290 local ok, result = engine:transaction(function() | |
291 local sql_query = "SELECT `key`, `type`, `value`, `when` FROM `prosodyarchive` WHERE %s ORDER BY `sort_id` %s%s;"; | |
292 local args = { host, user or "", store, }; | |
293 local where = { "`host` = ?", "`user` = ?", "`store` = ?", }; | |
294 | |
295 archive_where(query, args, where); | |
296 | |
297 -- Total matching | |
298 if query.total then | |
299 local stats = engine:select("SELECT COUNT(*) FROM `prosodyarchive` WHERE " .. t_concat(where, " AND "), unpack(args)); | |
300 if stats then | |
301 local _total = stats() | |
302 total = _total and _total[1]; | |
303 end | |
304 if query.limit == 0 then -- Skip the real query | |
305 return noop, total; | |
306 end | |
307 end | |
308 | |
309 archive_where_id_range(query, args, where); | |
310 | |
311 if query.limit then | |
312 args[#args+1] = query.limit; | |
313 end | |
314 | |
315 sql_query = sql_query:format(t_concat(where, " AND "), query.reverse and "DESC" or "ASC", query.limit and " LIMIT ?" or ""); | |
316 module:log("debug", sql_query); | |
317 return engine:select(sql_query, unpack(args)); | |
318 end); | |
319 if not ok then return ok, result end | |
320 return function() | |
321 local row = result(); | |
322 if row ~= nil then | |
323 return row[1], deserialize(row[2], row[3]), row[4]; | |
324 end | |
325 end, total; | |
326 end | |
327 | |
328 function archive_store:delete(username, query) | |
329 query = query or {}; | |
330 local user,store = username,self.store; | |
331 return engine:transaction(function() | |
332 local sql_query = "DELETE FROM `prosodyarchive` WHERE %s;"; | |
333 local args = { host, user or "", store, }; | |
334 local where = { "`host` = ?", "`user` = ?", "`store` = ?", }; | |
335 if user == true then | |
336 table.remove(args, 2); | |
337 table.remove(where, 2); | |
338 end | |
339 archive_where(query, args, where); | |
340 archive_where_id_range(query, args, where); | |
341 sql_query = sql_query:format(t_concat(where, " AND ")); | |
342 module:log("debug", sql_query); | |
343 return engine:delete(sql_query, unpack(args)); | |
344 end); | |
345 end | |
346 | |
347 local stores = { | |
348 keyval = keyval_store; | |
349 archive = archive_store; | |
350 }; | |
351 | |
352 --- Implement storage driver API | |
353 | |
354 -- FIXME: Some of these operations need to operate on the archive store(s) too | |
355 | |
356 local driver = {}; | |
357 | |
358 function driver:open(store, typ) | |
359 local store_mt = stores[typ or "keyval"]; | |
360 if store_mt then | |
361 return setmetatable({ store = store }, store_mt); | |
362 end | |
363 return nil, "unsupported-store"; | |
364 end | |
365 | |
366 function driver:stores(username) | |
367 local query = "SELECT DISTINCT `store` FROM `prosody` WHERE `host`=? AND `user`" .. | |
368 (username == true and "!=?" or "=?"); | |
369 if username == true or not username then | |
370 username = ""; | |
371 end | |
372 local ok, result = engine:transaction(function() | |
373 return engine:select(query, host, username); | |
374 end); | |
375 if not ok then return ok, result end | |
376 return iterator(result); | |
377 end | |
378 | |
379 function driver:purge(username) | |
380 return engine:transaction(function() | |
381 local stmt,err = engine:delete("DELETE FROM `prosody` WHERE `host`=? AND `user`=?", host, username); | |
382 return true, err; | |
383 end); | |
384 end | |
385 | |
386 module:provides("storage", driver); | |
387 | |
388 |