Comparison

tools/migration/migrator/prosody_sql.lua @ 4216:ff80a8471e86

tools/migration/*: Numerous changes and restructuring, and the addition of a Makefile
author Matthew Wild <mwild1@gmail.com>
date Sat, 26 Feb 2011 00:23:48 +0000
parent 4197:tools/migration/prosody_sql.lua@bef732980436
child 4234:ce92aafc9c03
comparison
equal deleted inserted replaced
4214:1674cd17557c 4216:ff80a8471e86
1
2 local assert = assert;
3 local DBI = require "DBI";
4 local print = print;
5 local type = type;
6 local next = next;
7 local pairs = pairs;
8 local t_sort = table.sort;
9 local json = require "util.json";
10 local mtools = require "migrator.mtools";
11 local tostring = tostring;
12 local tonumber = tonumber;
13
14 module "prosody_sql"
15
16 local function create_table(connection, params)
17 local create_sql = "CREATE TABLE `prosody` (`host` TEXT, `user` TEXT, `store` TEXT, `key` TEXT, `type` TEXT, `value` TEXT);";
18 if params.driver == "PostgreSQL" then
19 create_sql = create_sql:gsub("`", "\"");
20 end
21
22 local stmt = connection:prepare(create_sql);
23 if stmt then
24 local ok = stmt:execute();
25 local commit_ok = connection:commit();
26 if ok and commit_ok then
27 local index_sql = "CREATE INDEX `prosody_index` ON `prosody` (`host`, `user`, `store`, `key`)";
28 if params.driver == "PostgreSQL" then
29 index_sql = index_sql:gsub("`", "\"");
30 elseif params.driver == "MySQL" then
31 index_sql = index_sql:gsub("`([,)])", "`(20)%1");
32 end
33 local stmt, err = connection:prepare(index_sql);
34 local ok, commit_ok, commit_err;
35 if stmt then
36 ok, err = assert(stmt:execute());
37 commit_ok, commit_err = assert(connection:commit());
38 end
39 end
40 end
41 end
42
43 local function serialize(value)
44 local t = type(value);
45 if t == "string" or t == "boolean" or t == "number" then
46 return t, tostring(value);
47 elseif t == "table" then
48 local value,err = json.encode(value);
49 if value then return "json", value; end
50 return nil, err;
51 end
52 return nil, "Unhandled value type: "..t;
53 end
54 local function deserialize(t, value)
55 if t == "string" then return value;
56 elseif t == "boolean" then
57 if value == "true" then return true;
58 elseif value == "false" then return false; end
59 elseif t == "number" then return tonumber(value);
60 elseif t == "json" then
61 return json.decode(value);
62 end
63 end
64
65 local function decode_user(item)
66 local userdata = {
67 user = item[1][1].user;
68 host = item[1][1].host;
69 stores = {};
70 };
71 for i=1,#item do -- loop over stores
72 local result = {};
73 local store = item[i];
74 for i=1,#store do -- loop over store data
75 local row = store[i];
76 local k = row.key;
77 local v = deserialize(row.type, row.value);
78 if k and v then
79 if k ~= "" then result[k] = v; elseif type(v) == "table" then
80 for a,b in pairs(v) do
81 result[a] = b;
82 end
83 end
84 end
85 userdata.stores[store[1].store] = result;
86 end
87 end
88 return userdata;
89 end
90
91 function reader(input)
92 local dbh = assert(DBI.Connect(
93 assert(input.driver, "no input.driver specified"),
94 assert(input.database, "no input.database specified"),
95 input.username, input.password,
96 input.host, input.port
97 ));
98 assert(dbh:ping());
99 local stmt = assert(dbh:prepare("SELECT * FROM prosody"));
100 assert(stmt:execute());
101 local keys = {"host", "user", "store", "key", "type", "value"};
102 local f,s,val = stmt:rows(true);
103 -- get SQL rows, sorted
104 local iter = mtools.sorted {
105 reader = function() val = f(s, val); return val; end;
106 filter = function(x)
107 for i=1,#keys do
108 if not x[keys[i]] then return false; end -- TODO log error, missing field
109 end
110 if x.host == "" then x.host = nil; end
111 if x.user == "" then x.user = nil; end
112 if x.store == "" then x.store = nil; end
113 return x;
114 end;
115 sorter = function(a, b)
116 local a_host, a_user, a_store = a.host or "", a.user or "", a.store or "";
117 local b_host, b_user, b_store = b.host or "", b.user or "", b.store or "";
118 return a_host > b_host or (a_host==b_host and a_user > b_user) or (a_host==b_host and a_user==b_user and a_store > b_store);
119 end;
120 };
121 -- merge rows to get stores
122 iter = mtools.merged(iter, function(a, b)
123 return (a.host == b.host and a.user == b.user and a.store == b.store);
124 end);
125 -- merge stores to get users
126 iter = mtools.merged(iter, function(a, b)
127 return (a[1].host == b[1].host and a[1].user == b[1].user);
128 end);
129 return function()
130 local x = iter();
131 return x and decode_user(x);
132 end;
133 end
134
135 function writer(output, iter)
136 local dbh = assert(DBI.Connect(
137 assert(output.driver, "no output.driver specified"),
138 assert(output.database, "no output.database specified"),
139 output.username, output.password,
140 output.host, output.port
141 ));
142 assert(dbh:ping());
143 create_table(dbh, output);
144 local stmt = assert(dbh:prepare("SELECT * FROM prosody"));
145 assert(stmt:execute());
146 local stmt = assert(dbh:prepare("DELETE FROM prosody"));
147 assert(stmt:execute());
148 local insert = assert(dbh:prepare("INSERT INTO `prosody` (`host`,`user`,`store`,`key`,`type`,`value`) VALUES (?,?,?,?,?,?)"));
149
150 return function(item)
151 if not item then assert(dbh:commit()) return dbh:close(); end -- end of input
152 local host = item.host or "";
153 local user = item.user or "";
154 for store, data in pairs(item.stores) do
155 -- TODO transactions
156 local extradata = {};
157 for key, value in pairs(data) do
158 if type(key) == "string" and key ~= "" then
159 local t, value = assert(serialize(value));
160 local ok, err = assert(insert:execute(host, user, store, key, t, value));
161 else
162 extradata[key] = value;
163 end
164 end
165 if next(extradata) ~= nil then
166 local t, extradata = assert(serialize(extradata));
167 local ok, err = assert(insert:execute(host, user, store, "", t, extradata));
168 end
169 end
170 end;
171 end
172
173
174 return _M;