Software /
code /
prosody-modules
Comparison
mod_mam_muc_sql/mod_mam_muc_sql.lua @ 821:87e847280aef
mod_mam_muc_sql: SQL variant of mod_mam_muc
author | Kim Alvefur <zash@zash.se> |
---|---|
date | Mon, 17 Sep 2012 20:15:27 +0200 |
child | 1343:7dbde05b48a9 |
comparison
equal
deleted
inserted
replaced
820:005037032d65 | 821:87e847280aef |
---|---|
1 -- XEP-0313: Message Archive Management for Prosody | |
2 -- Copyright (C) 2011-2012 Kim Alvefur | |
3 -- | |
4 -- This file is MIT/X11 licensed. | |
5 | |
6 local xmlns_mam = "urn:xmpp:mam:tmp"; | |
7 local xmlns_delay = "urn:xmpp:delay"; | |
8 local xmlns_forward = "urn:xmpp:forward:0"; | |
9 | |
10 local st = require "util.stanza"; | |
11 local rsm = module:require "mod_mam/rsm"; | |
12 local jid_bare = require "util.jid".bare; | |
13 local jid_split = require "util.jid".split; | |
14 local jid_prep = require "util.jid".prep; | |
15 local host = module.host; | |
16 | |
17 local serialize, deserialize = require"util.json".encode, require"util.json".decode; | |
18 local unpack = unpack; | |
19 local tostring = tostring; | |
20 local time_now = os.time; | |
21 local t_insert = table.insert; | |
22 local m_min = math.min; | |
23 local timestamp, timestamp_parse = require "util.datetime".datetime, require "util.datetime".parse; | |
24 local default_max_items, max_max_items = 20, module:get_option_number("max_archive_query_results", 50); | |
25 --local rooms_to_archive = module:get_option_set("rooms_to_archive",{}); | |
26 | |
27 local sql, setsql, getsql = {}; | |
28 do -- SQL stuff | |
29 local dburi; | |
30 local connection; | |
31 local connections = module:shared "/*/sql/connection-cache"; | |
32 local build_url = require"socket.url".build; | |
33 local resolve_relative_path = require "core.configmanager".resolve_relative_path; | |
34 local params = module:get_option("mam_sql", module:get_option("sql")); | |
35 | |
36 local function db2uri(params) | |
37 return build_url{ | |
38 scheme = params.driver, | |
39 user = params.username, | |
40 password = params.password, | |
41 host = params.host, | |
42 port = params.port, | |
43 path = params.database, | |
44 }; | |
45 end | |
46 | |
47 local function test_connection() | |
48 if not connection then return nil; end | |
49 if connection:ping() then | |
50 return true; | |
51 else | |
52 module:log("debug", "Database connection closed"); | |
53 connection = nil; | |
54 connections[dburi] = nil; | |
55 end | |
56 end | |
57 local function connect() | |
58 if not test_connection() then | |
59 prosody.unlock_globals(); | |
60 local dbh, err = DBI.Connect( | |
61 params.driver, params.database, | |
62 params.username, params.password, | |
63 params.host, params.port | |
64 ); | |
65 prosody.lock_globals(); | |
66 if not dbh then | |
67 module:log("debug", "Database connection failed: %s", tostring(err)); | |
68 return nil, err; | |
69 end | |
70 module:log("debug", "Successfully connected to database"); | |
71 dbh:autocommit(false); -- don't commit automatically | |
72 connection = dbh; | |
73 | |
74 connections[dburi] = dbh; | |
75 end | |
76 return connection; | |
77 end | |
78 | |
79 do -- process options to get a db connection | |
80 local ok; | |
81 prosody.unlock_globals(); | |
82 ok, DBI = pcall(require, "DBI"); | |
83 if not ok then | |
84 package.loaded["DBI"] = {}; | |
85 module:log("error", "Failed to load the LuaDBI library for accessing SQL databases: %s", DBI); | |
86 module:log("error", "More information on installing LuaDBI can be found at http://prosody.im/doc/depends#luadbi"); | |
87 end | |
88 prosody.lock_globals(); | |
89 if not ok or not DBI.Connect then | |
90 return; -- Halt loading of this module | |
91 end | |
92 | |
93 params = params or { driver = "SQLite3" }; | |
94 | |
95 if params.driver == "SQLite3" then | |
96 params.database = resolve_relative_path(prosody.paths.data or ".", params.database or "prosody.sqlite"); | |
97 end | |
98 | |
99 assert(params.driver and params.database, "Both the SQL driver and the database need to be specified"); | |
100 | |
101 dburi = db2uri(params); | |
102 connection = connections[dburi]; | |
103 | |
104 assert(connect()); | |
105 | |
106 end | |
107 | |
108 function getsql(sql, ...) | |
109 if params.driver == "PostgreSQL" then | |
110 sql = sql:gsub("`", "\""); | |
111 end | |
112 -- do prepared statement stuff | |
113 local stmt, err = connection:prepare(sql); | |
114 if not stmt and not test_connection() then error("connection failed"); end | |
115 if not stmt then module:log("error", "QUERY FAILED: %s %s", err, debug.traceback()); return nil, err; end | |
116 -- run query | |
117 local ok, err = stmt:execute(...); | |
118 if not ok and not test_connection() then error("connection failed"); end | |
119 if not ok then return nil, err; end | |
120 | |
121 return stmt; | |
122 end | |
123 function setsql(sql, ...) | |
124 local stmt, err = getsql(sql, ...); | |
125 if not stmt then return stmt, err; end | |
126 return stmt:affected(); | |
127 end | |
128 function sql.rollback(...) | |
129 if connection then connection:rollback(); end -- FIXME check for rollback error? | |
130 return ...; | |
131 end | |
132 function sql.commit(...) | |
133 if not connection:commit() then return nil, "SQL commit failed"; end | |
134 return ...; | |
135 end | |
136 | |
137 end | |
138 | |
139 local archive_store = "archive2"; | |
140 | |
141 -- Handle archive queries | |
142 module:hook("iq/bare/"..xmlns_mam..":query", function(event) | |
143 local origin, stanza = event.origin, event.stanza; | |
144 local room = jid_split(stanza.attr.to); | |
145 local query = stanza.tags[1]; | |
146 | |
147 local room_obj = hosts[module.host].modules.muc.rooms[jid_bare(stanza.attr.to)]; | |
148 if not room_obj then | |
149 return -- FIXME not found | |
150 end | |
151 local from = jid_bare(stanza.attr.from); | |
152 | |
153 if room_obj._affiliations[from] == "outcast" | |
154 or room_obj._data.members_only and not room_obj._affiliations[from] then | |
155 return -- FIXME unauth | |
156 end | |
157 | |
158 if stanza.attr.type == "get" then | |
159 local qid = query.attr.queryid; | |
160 | |
161 -- Search query parameters | |
162 local qwith = query:get_child_text("with"); | |
163 local qstart = query:get_child_text("start"); | |
164 local qend = query:get_child_text("end"); | |
165 local qset = rsm.get(query); | |
166 module:log("debug", "Archive query, id %s with %s from %s until %s)", | |
167 tostring(qid), qwith or "anyone", qstart or "the dawn of time", qend or "now"); | |
168 | |
169 if qstart or qend then -- Validate timestamps | |
170 local vstart, vend = (qstart and timestamp_parse(qstart)), (qend and timestamp_parse(qend)) | |
171 if (qstart and not vstart) or (qend and not vend) then | |
172 origin.send(st.error_reply(stanza, "modify", "bad-request", "Invalid timestamp")) | |
173 return true | |
174 end | |
175 qstart, qend = vstart, vend; | |
176 end | |
177 | |
178 local qres; | |
179 if qwith then -- Validate the 'with' jid | |
180 local pwith = qwith and jid_prep(qwith); | |
181 if pwith and not qwith then -- it failed prepping | |
182 origin.send(st.error_reply(stanza, "modify", "bad-request", "Invalid JID")) | |
183 return true | |
184 end | |
185 local _, _, resource = jid_split(qwith); | |
186 qwith = jid_bare(pwith); | |
187 qres = resource; | |
188 end | |
189 | |
190 -- RSM stuff | |
191 local qmax = m_min(qset and qset.max or default_max_items, max_max_items); | |
192 local first, last; | |
193 local n = 0; | |
194 | |
195 local sql_query = ([[ | |
196 SELECT `id`, `when`, `stanza` | |
197 FROM `prosodyarchive` | |
198 WHERE `host` = ? AND `user` = ? AND `store` = ? | |
199 AND `when` BETWEEN ? AND ? | |
200 %s | |
201 AND `id` > ? | |
202 LIMIT ?; | |
203 ]]):format(qres and [[AND `resource` = ?]] or "") | |
204 | |
205 local p = { | |
206 host, room, archive_store, | |
207 qstart or 0, qend or time_now(), | |
208 qset and tonumber(qset.after) or 0, | |
209 qmax | |
210 }; | |
211 if qres then | |
212 t_insert(p, 6, qres); | |
213 end | |
214 module:log("debug", "getsql(sql_query, "..table.concat(p, ", ")); | |
215 local data, err = getsql(sql_query, unpack(p)); | |
216 if not data then | |
217 origin.send(st.error_reply(stanza, "cancel", "internal-server-error", "Error loading archive: "..tostring(err))); | |
218 return true | |
219 end | |
220 | |
221 for item in data:rows() do | |
222 local id, when, orig_stanza = unpack(item); | |
223 --module:log("debug", "id is %s", id); | |
224 | |
225 local fwd_st = st.message{ to = stanza.attr.from, from = stanza.attr.to } | |
226 :tag("result", { xmlns = xmlns_mam, queryid = qid, id = id }):up() | |
227 :tag("forwarded", { xmlns = xmlns_forward }) | |
228 :tag("delay", { xmlns = xmlns_delay, stamp = timestamp(when) }):up(); | |
229 orig_stanza = st.deserialize(deserialize(orig_stanza)); | |
230 orig_stanza.attr.xmlns = "jabber:client"; | |
231 fwd_st:add_child(orig_stanza); | |
232 origin.send(fwd_st); | |
233 last = id; | |
234 n = n + 1; | |
235 if not first then | |
236 first = id; | |
237 end | |
238 end | |
239 -- That's all folks! | |
240 module:log("debug", "Archive query %s completed", tostring(qid)); | |
241 | |
242 local reply = st.reply(stanza); | |
243 if last then | |
244 -- This is a bit redundant, isn't it? | |
245 reply:query(xmlns_mam):add_child(rsm.generate{first = first, last = last, count = n}); | |
246 end | |
247 origin.send(reply); | |
248 return true | |
249 end | |
250 end); | |
251 | |
252 -- Handle messages | |
253 local function message_handler(event) | |
254 local origin, stanza = event.origin, event.stanza; | |
255 local orig_type = stanza.attr.type or "normal"; | |
256 local orig_to = stanza.attr.to; | |
257 local orig_from = stanza.attr.from; | |
258 | |
259 -- Still needed? | |
260 if not orig_from then | |
261 orig_from = origin.full_jid; | |
262 end | |
263 | |
264 -- Only store groupchat messages | |
265 if not (orig_type == "groupchat" and (stanza:get_child("body") or stanza:get_child("subject"))) then | |
266 return; | |
267 end | |
268 | |
269 local room = jid_split(orig_to); | |
270 local room_obj = hosts[host].modules.muc.rooms[orig_to] | |
271 if not room_obj then return end | |
272 | |
273 local when = time_now(); | |
274 local stanza = st.clone(stanza); -- Private copy | |
275 --stanza.attr.to = nil; | |
276 local nick = room_obj._jid_nick[orig_from]; | |
277 if not nick then return end | |
278 stanza.attr.from = nick; | |
279 local _, _, nick = jid_split(nick); | |
280 -- And stash it | |
281 local ok, err = setsql([[ | |
282 INSERT INTO `prosodyarchive` | |
283 (`host`, `user`, `store`, `when`, `resource`, `stanza`) | |
284 VALUES (?, ?, ?, ?, ?, ?); | |
285 ]], host, room, archive_store, when, nick, serialize(st.preserialize(stanza))) | |
286 if ok then | |
287 sql.commit(); | |
288 else | |
289 module:log("error", "SQL error: %s", err); | |
290 sql.rollback(); | |
291 end | |
292 --[[ This was dropped from the spec | |
293 if ok then | |
294 stanza:tag("archived", { xmlns = xmlns_mam, by = host, id = id }):up(); | |
295 end | |
296 --]] | |
297 end | |
298 | |
299 module:hook("message/bare", message_handler, 2); | |
300 | |
301 module:add_feature(xmlns_mam); |