Software /
code /
prosody
Comparison
tools/migration/prosody_sql.lua @ 4162:af720a91aa19
tools/migration/*: Initial commit of a new migration tool. Currently supports Prosody files and Prosody SQL as input and output.
author | Waqas Hussain <waqas20@gmail.com> |
---|---|
date | Wed, 23 Feb 2011 02:16:19 +0500 |
child | 4166:3ac90743039b |
comparison
equal
deleted
inserted
replaced
4161:c299726d2b4e | 4162:af720a91aa19 |
---|---|
1 | |
2 local assert = assert; | |
3 local DBI = require "DBI"; | |
4 local print = print; | |
5 local type = type; | |
6 local next = next; | |
7 local pairs = pairs; | |
8 local t_sort = table.sort; | |
9 local json = require "util.json"; | |
10 local mtools = require "mtools"; | |
11 local tostring = tostring; | |
12 local tonumber = tonumber; | |
13 | |
14 module "prosody_sql" | |
15 | |
16 local function create_table(connection, params) | |
17 local create_sql = "CREATE TABLE `prosody` (`host` TEXT, `user` TEXT, `store` TEXT, `key` TEXT, `type` TEXT, `value` TEXT);"; | |
18 if params.driver == "PostgreSQL" then | |
19 create_sql = create_sql:gsub("`", "\""); | |
20 end | |
21 | |
22 local stmt = connection:prepare(create_sql); | |
23 if stmt then | |
24 local ok = stmt:execute(); | |
25 local commit_ok = connection:commit(); | |
26 if ok and commit_ok then | |
27 local index_sql = "CREATE INDEX `prosody_index` ON `prosody` (`host`, `user`, `store`, `key`)"; | |
28 if params.driver == "PostgreSQL" then | |
29 index_sql = index_sql:gsub("`", "\""); | |
30 elseif params.driver == "MySQL" then | |
31 index_sql = index_sql:gsub("`([,)])", "`(20)%1"); | |
32 end | |
33 local stmt, err = connection:prepare(index_sql); | |
34 local ok, commit_ok, commit_err; | |
35 if stmt then | |
36 ok, err = assert(stmt:execute()); | |
37 commit_ok, commit_err = assert(connection:commit()); | |
38 end | |
39 end | |
40 end | |
41 end | |
42 | |
43 local function serialize(value) | |
44 local t = type(value); | |
45 if t == "string" or t == "boolean" or t == "number" then | |
46 return t, tostring(value); | |
47 elseif t == "table" then | |
48 local value,err = json.encode(value); | |
49 if value then return "json", value; end | |
50 return nil, err; | |
51 end | |
52 return nil, "Unhandled value type: "..t; | |
53 end | |
54 local function deserialize(t, value) | |
55 if t == "string" then return value; | |
56 elseif t == "boolean" then | |
57 if value == "true" then return true; | |
58 elseif value == "false" then return false; end | |
59 elseif t == "number" then return tonumber(value); | |
60 elseif t == "json" then | |
61 return json.decode(value); | |
62 end | |
63 end | |
64 | |
65 local function decode_user(item) | |
66 local userdata = { | |
67 user = item[1][1].user; | |
68 host = item[1][1].host; | |
69 stores = {}; | |
70 }; | |
71 for i=1,#item do -- loop over stores | |
72 local result = {}; | |
73 local store = item[i]; | |
74 for i=1,#store do -- loop over store data | |
75 local row = store[i]; | |
76 local k = row.key; | |
77 local v = deserialize(row.type, row.value); | |
78 if k and v then | |
79 if k ~= "" then result[k] = v; elseif type(v) == "table" then | |
80 for a,b in pairs(v) do | |
81 result[a] = b; | |
82 end | |
83 end | |
84 end | |
85 userdata.stores[store[1].store] = result; | |
86 end | |
87 end | |
88 return userdata; | |
89 end | |
90 | |
91 function reader(input) | |
92 local dbh = assert(DBI.Connect( | |
93 assert(input.driver, "no input.driver specified"), | |
94 assert(input.database, "no input.database specified"), | |
95 input.username, input.password, | |
96 input.host, input.port | |
97 )); | |
98 assert(dbh:ping()); | |
99 local stmt = assert(dbh:prepare("SELECT * FROM prosody")); | |
100 assert(stmt:execute()); | |
101 local keys = {"host", "user", "store", "key", "type", "value"}; | |
102 local f,s,val = stmt:rows(true); | |
103 -- get SQL rows, sorted | |
104 local iter = mtools.sorted { | |
105 reader = function() val = f(s, val); return val; end; | |
106 filter = function(x) | |
107 for i=1,#keys do | |
108 if not x[keys[i]] then return false; end -- TODO log error, missing field | |
109 end | |
110 if x.host == "" then x.host = nil; end | |
111 if x.user == "" then x.user = nil; end | |
112 if x.store == "" then x.store = nil; end | |
113 return x; | |
114 end; | |
115 sorter = function(a, b) | |
116 local a_host, a_user, a_store = a.host or "", a.user or "", a.store or ""; | |
117 local b_host, b_user, b_store = b.host or "", b.user or "", b.store or ""; | |
118 return a_host > b_host or (a_host==b_host and a_user > b_user) or (a_host==b_host and a_user==b_user and a_store > b_store); | |
119 end; | |
120 }; | |
121 -- merge rows to get stores | |
122 iter = mtools.merged(iter, function(a, b) | |
123 return (a.host == b.host and a.user == b.user and a.store == b.store); | |
124 end); | |
125 -- merge stores to get users | |
126 iter = mtools.merged(iter, function(a, b) | |
127 return (a[1].host == b[1].host and a[1].user == b[1].user); | |
128 end); | |
129 return function() | |
130 local x = iter(); | |
131 return x and decode_user(x); | |
132 end; | |
133 end | |
134 | |
135 function writer(output, iter) | |
136 local dbh = assert(DBI.Connect( | |
137 assert(output.driver, "no output.driver specified"), | |
138 assert(output.database, "no output.database specified"), | |
139 output.username, output.password, | |
140 output.host, output.port | |
141 )); | |
142 assert(dbh:ping()); | |
143 create_table(dbh, output); | |
144 local stmt = assert(dbh:prepare("SELECT * FROM prosody")); | |
145 assert(stmt:execute()); | |
146 local stmt = assert(dbh:prepare("DELETE FROM prosody")); | |
147 assert(stmt:execute()); | |
148 local insert = assert(dbh:prepare("INSERT INTO `prosody` (`host`,`user`,`store`,`key`,`type`,`value`) VALUES (?,?,?,?,?,?)")); | |
149 | |
150 return function(item) | |
151 if not item then assert(dbh:commit()) return dbh:close(); end -- end of input | |
152 local host = item.host or ""; | |
153 local user = item.user or ""; | |
154 for store, data in pairs(item.stores) do | |
155 -- TODO transactions | |
156 local extradata = {}; | |
157 for key, value in pairs(data) do | |
158 if type(key) == "string" and key ~= "" then | |
159 local t, value = assert(serialize(value)); | |
160 local ok, err = assert(insert:execute(host, user, store, key, t, value)); | |
161 else | |
162 extradata[key] = value; | |
163 end | |
164 end | |
165 if next(extradata) ~= nil then | |
166 local t, extradata = assert(serialize(extradata)); | |
167 local ok, err = assert(insert:execute(host, user, store, "", t, extradata)); | |
168 end | |
169 end | |
170 end; | |
171 end | |
172 | |
173 | |
174 return _M; |