Comparison

mod_muc_rai/mod_muc_rai.lua @ 3974:f14c862598a9

mod_muc_rai: New module to implement Room Activity Indicators
author Matthew Wild <mwild1@gmail.com>
date Wed, 15 Apr 2020 21:19:45 +0100
child 3997:0e72dd70afff
comparison
equal deleted inserted replaced
3973:df6227e288e5 3974:f14c862598a9
1 local cache = require "util.cache";
2 local jid = require "util.jid";
3 local st = require "util.stanza";
4
5 local max_subscribers = module:get_option_number("muc_rai_max_subscribers", 1024);
6
7 local muc_affiliation_store = module:open_store("config", "map");
8 local muc_archive = module:open_store("muc_log", "archive");
9
10 local xmlns_rai = "xmpp:prosody.im/protocol/rai";
11
12 local muc_markers = module:depends("muc_markers");
13
14 -- subscriber_jid -> { [room_jid] = interested }
15 local subscribed_users = cache.new(max_subscribers, false);
16 -- room_jid -> { [user_jid] = interested }
17 local interested_users = {};
18 -- room_jid -> last_id
19 local room_activity_cache = cache.new(1024);
20
21 -- Send a single notification for a room, updating data structures as needed
22 local function send_single_notification(user_jid, room_jid)
23 local notification = st.message({ to = user_jid, from = module.host })
24 :tag("rai", { xmlns = xmlns_rai })
25 :text_tag("activity", room_jid)
26 :up();
27 local interested_room_users = interested_users[room_jid];
28 if interested_room_users then
29 interested_room_users[user_jid] = nil;
30 end
31 local interested_rooms = subscribed_users:get(user_jid);
32 if interested_rooms then
33 interested_rooms[room_jid] = nil;
34 end
35 module:log("debug", "Sending notification from %s to %s", room_jid, user_jid);
36 return module:send(notification);
37 end
38
39 local function subscribe_room(user_jid, room_jid)
40 local interested_rooms = subscribed_users:get(user_jid);
41 if not interested_rooms then
42 return nil, "not-subscribed";
43 end
44 module:log("debug", "Subscribed %s to %s", user_jid, room_jid);
45 interested_rooms[room_jid] = true;
46
47 local interested_room_users = interested_users[room_jid];
48 if not interested_room_users then
49 interested_room_users = {};
50 interested_users[room_jid] = interested_room_users;
51 end
52 interested_room_users[user_jid] = true;
53 return true;
54 end
55
56 local function unsubscribe_room(user_jid, room_jid)
57 local interested_rooms = subscribed_users:get(user_jid);
58 if not interested_rooms then
59 return nil, "not-subscribed";
60 end
61 interested_rooms[room_jid] = nil;
62
63 local interested_room_users = interested_users[room_jid];
64 if not interested_room_users then
65 return true;
66 end
67 interested_room_users[user_jid] = nil;
68 return true;
69 end
70
71 local function notify_interested_users(room_jid)
72 module:log("warn", "NOTIFYING FOR %s", room_jid)
73 local interested_room_users = interested_users[room_jid];
74 if not interested_room_users then
75 module:log("debug", "Nobody interested in %s", room_jid);
76 return;
77 end
78 for user_jid in pairs(interested_room_users) do
79 send_single_notification(user_jid, room_jid);
80 end
81 return true;
82 end
83
84 local function unsubscribe_user_from_all_rooms(user_jid)
85 local interested_rooms = subscribed_users:get(user_jid);
86 if not interested_rooms then
87 return nil, "not-subscribed";
88 end
89 for room_jid in pairs(interested_rooms) do
90 unsubscribe_room(user_jid, room_jid);
91 end
92 return true;
93 end
94
95 local function get_last_room_message_id(room_jid)
96 local last_room_message_id = room_activity_cache:get(room_jid);
97 if last_room_message_id then
98 return last_room_message_id;
99 end
100
101 -- Load all the data!
102 local query = {
103 limit = 1;
104 reverse = true;
105 with = "message<groupchat";
106 }
107 local data, err = muc_archive:find(jid.node(room_jid), query);
108
109 if not data then
110 module:log("error", "Could not fetch history: %s", err);
111 return nil;
112 end
113
114 local id = data();
115 room_activity_cache:set(room_jid, id);
116 return id;
117 end
118
119 local function update_room_activity(room_jid, last_id)
120 room_activity_cache:set(room_jid, last_id);
121 end
122
123 local function get_last_user_read_id(user_jid, room_jid)
124 return muc_markers.get_user_read_marker(user_jid, room_jid);
125 end
126
127 local function has_new_activity(room_jid, user_jid)
128 local last_room_message_id = get_last_room_message_id(room_jid);
129 local last_user_read_id = get_last_user_read_id(user_jid, room_jid);
130 return last_room_message_id ~= last_user_read_id;
131 end
132
133 -- Returns a set of rooms that a user is interested in
134 local function get_interested_rooms(user_jid)
135 -- Use affiliation as an indication of interest, return
136 -- all rooms a user is affiliated
137 return muc_affiliation_store:get_all(jid.bare(user_jid));
138 end
139
140 -- Subscribes to all rooms that the user has an interest in
141 -- Returns a set of room JIDs that have already had activity (thus no subscription)
142 local function subscribe_all_rooms(user_jid)
143 -- Send activity notifications for all relevant rooms
144 local interested_rooms, err = get_interested_rooms(user_jid);
145
146 if not interested_rooms then
147 if err then
148 return nil, "internal-server-error";
149 end
150 interested_rooms = {};
151 end
152
153 if not subscribed_users:set(user_jid, interested_rooms) then
154 module:log("warn", "Subscriber limit (%d) reached, rejecting subscription from %s", max_subscribers, user_jid);
155 return nil, "resource-constraint";
156 end
157
158 local rooms_with_activity;
159 for room_name in pairs(interested_rooms) do
160 local room_jid = room_name.."@"..module.host;
161 if has_new_activity(room_jid, user_jid) then
162 -- There has already been activity, include this room
163 -- in the response
164 if not rooms_with_activity then
165 rooms_with_activity = {};
166 end
167 rooms_with_activity[room_jid] = true;
168 else
169 -- Subscribe to any future activity
170 subscribe_room(user_jid, room_jid);
171 end
172 end
173 return rooms_with_activity;
174 end
175
176 module:hook("presence/host", function (event)
177 local origin, stanza = event.origin, event.stanza;
178 local user_jid = stanza.attr.from;
179
180 if stanza.attr.type == "unavailable" then -- User going offline
181 unsubscribe_user_from_all_rooms(user_jid);
182 return true;
183 end
184
185 local rooms_with_activity, err = subscribe_all_rooms(user_jid);
186
187 if not rooms_with_activity then
188 if not err then
189 module:log("debug", "No activity to notify");
190 return true;
191 else
192 return origin.send(st.error_reply(stanza, "wait", "resource-constraint"));
193 end
194 end
195
196 local reply = st.reply(stanza)
197 :tag("rai", { xmlns = xmlns_rai });
198 for room_jid in pairs(rooms_with_activity) do
199 reply:text_tag("activity", room_jid);
200 end
201 return origin.send(reply);
202 end);
203
204 module:hook("muc-broadcast-message", function (event)
205 local room, stanza = event.room, event.stanza;
206 local archive_id = stanza:get_child_text("stanza-id", "urn:xmpp:sid:0");
207 if archive_id then
208 -- Remember the id of the last message so we can compare it
209 -- to the per-user marker (managed by mod_muc_markers)
210 update_room_activity(room.jid, archive_id);
211 -- Notify any users that need to be notified
212 notify_interested_users(room.jid);
213 end
214 end, -1);
215