File

mod_muc_rai/mod_muc_rai.lua @ 4282:281a864e7472

mod_pubsub_feeds: Don't skip publishing items after an existing one I encountered a feed which was backwards, such that older entries were considered first and then it would skip newer entries. This may however run into trouble if the feed contains more items than what's persisted in pubsub.
author Kim Alvefur <zash@zash.se>
date Mon, 30 Nov 2020 15:17:29 +0100
parent 4281:3c80e46e26f2
child 4297:4a5c4a352b78
line wrap: on
line source

local cache = require "util.cache";
local jid = require "util.jid";
local st = require "util.stanza";

local max_subscribers = module:get_option_number("muc_rai_max_subscribers", 1024);

local muc_affiliation_store = module:open_store("config", "map");
local muc_archive = module:open_store("muc_log", "archive");

local xmlns_rai = "xmpp:prosody.im/protocol/rai";

local muc_markers = module:depends("muc_markers");

-- subscriber_jid -> { [room_jid] = interested }
local subscribed_users = cache.new(max_subscribers, false);
-- room_jid -> { [user_jid] = interested }
local interested_users = {};
-- room_jid -> last_id
local room_activity_cache = cache.new(1024);

-- Send a single notification for a room, updating data structures as needed
local function send_single_notification(user_jid, room_jid)
	local notification = st.message({ to = user_jid, from = module.host })
		:tag("rai", { xmlns = xmlns_rai })
			:text_tag("activity", room_jid)
		:up();
	local interested_room_users = interested_users[room_jid];
	if interested_room_users then
		interested_room_users[user_jid] = nil;
	end
	local interested_rooms = subscribed_users:get(user_jid);
	if interested_rooms then
		interested_rooms[room_jid] = nil;
	end
	module:log("debug", "Sending notification from %s to %s", room_jid, user_jid);
	return module:send(notification);
end

local function subscribe_room(user_jid, room_jid)
	local interested_rooms = subscribed_users:get(user_jid);
	if not interested_rooms then
		return nil, "not-subscribed";
	end
	module:log("debug", "Subscribed %s to %s", user_jid, room_jid);
	interested_rooms[room_jid] = true;

	local interested_room_users = interested_users[room_jid];
	if not interested_room_users then
		interested_room_users = {};
		interested_users[room_jid] = interested_room_users;
	end
	interested_room_users[user_jid] = true;
	return true;
end

local function unsubscribe_room(user_jid, room_jid)
	local interested_rooms = subscribed_users:get(user_jid);
	if not interested_rooms then
		return nil, "not-subscribed";
	end
	interested_rooms[room_jid] = nil;

	local interested_room_users = interested_users[room_jid];
	if not interested_room_users then
		return true;
	end
	interested_room_users[user_jid] = nil;
	return true;
end

local function notify_interested_users(room_jid)
	module:log("warn", "NOTIFYING FOR %s", room_jid)
	local interested_room_users = interested_users[room_jid];
	if not interested_room_users then
		module:log("debug", "Nobody interested in %s", room_jid);
		return;
	end
	for user_jid in pairs(interested_room_users) do
		send_single_notification(user_jid, room_jid);
	end
	return true;
end

local function unsubscribe_user_from_all_rooms(user_jid)
	local interested_rooms = subscribed_users:get(user_jid);
	if not interested_rooms then
		return nil, "not-subscribed";
	end
	for room_jid in pairs(interested_rooms) do
		unsubscribe_room(user_jid, room_jid);
	end
	return true;
end

local function get_last_room_message_id(room_jid)
	local last_room_message_id = room_activity_cache:get(room_jid);
	if last_room_message_id then
		return last_room_message_id;
	end

	-- Load all the data!
	local query = {
		limit = 1;
		reverse = true;
		with = "message<groupchat";
	}
	local data, err = muc_archive:find(jid.node(room_jid), query);

	if not data then
		module:log("error", "Could not fetch history: %s", err);
		return nil;
	end

	local id = data();
	room_activity_cache:set(room_jid, id);
	return id;
end

local function update_room_activity(room_jid, last_id)
	room_activity_cache:set(room_jid, last_id);
end

local function get_last_user_read_id(user_jid, room_jid)
	return muc_markers.get_user_read_marker(jid.bare(user_jid), room_jid);
end

local function has_new_activity(room_jid, user_jid)
	local last_room_message_id = get_last_room_message_id(room_jid);
	local last_user_read_id = get_last_user_read_id(user_jid, room_jid);
	module:log("debug", "Checking activity in <%s> (%s) for <%s> (%s): %s",
		room_jid, last_room_message_id,
		user_jid, last_user_read_id,
		tostring(last_room_message_id ~= last_user_read_id)
	);
	return last_room_message_id ~= last_user_read_id;
end

-- Returns a set of rooms that a user is interested in
local function get_interested_rooms(user_jid)
	-- Use affiliation as an indication of interest, return
	-- all rooms a user is affiliated
	return muc_affiliation_store:get_all(jid.bare(user_jid));
end

local function is_subscribed(user_jid)
	return not not subscribed_users:get(user_jid);
end

-- Subscribes to all rooms that the user has an interest in
-- Returns a set of room JIDs that have already had activity (thus no subscription)
local function subscribe_all_rooms(user_jid)
	if is_subscribed(user_jid) then
		return nil;
	end

	-- Send activity notifications for all relevant rooms
	local interested_rooms, err = get_interested_rooms(user_jid);

	if not interested_rooms then
		if err then
			return nil, "internal-server-error";
		end
		interested_rooms = {};
	end

	if not subscribed_users:set(user_jid, {}) then
		module:log("warn", "Subscriber limit (%d) reached, rejecting subscription from %s", max_subscribers, user_jid);
		return nil, "resource-constraint";
	end

	local rooms_with_activity;
	for room_name in pairs(interested_rooms) do
		local room_jid = room_name.."@"..module.host;
		if has_new_activity(room_jid, user_jid) then
			-- There has already been activity, include this room
			-- in the response
			if not rooms_with_activity then
				rooms_with_activity = {};
			end
			rooms_with_activity[room_jid] = true;
		else
			-- Subscribe to any future activity
			subscribe_room(user_jid, room_jid);
		end
	end
	return rooms_with_activity;
end

module:hook("muc-occupant-joined", function(event)
	local room_jid, user_jid = event.room.jid, event.stanza.attr.from;
	local ok, err = unsubscribe_room(user_jid, room_jid);
	if ok then
		module:log("debug", "Unsubscribed %s to %s Reason: muc-occupant-joined", user_jid, room_jid)
	end
end);

module:hook("muc-occupant-left", function(event)
	local room_jid, user_jid = event.room.jid, event.stanza.attr.from;
	local ok, err = subscribe_room(user_jid, room_jid);
	if ok then
		module:log("debug", "Subscribed %s to %s Reason: muc-occupant-left", user_jid, room_jid)
	end
end);

module:hook("presence/host", function (event)
	local origin, stanza = event.origin, event.stanza;
	local user_jid = stanza.attr.from;

	if stanza.attr.type == "unavailable" then -- User going offline
		unsubscribe_user_from_all_rooms(user_jid);
		return true;
	end

	if not stanza:get_child("rai", xmlns_rai) then
		return; -- Ignore, no <rai/> tag
	end

	local rooms_with_activity, err = subscribe_all_rooms(user_jid);

	if not rooms_with_activity then
		if not err then
			module:log("debug", "No activity to notify");
			return true;
		else
			return origin.send(st.error_reply(stanza, "wait", "resource-constraint"));
		end
	end

	local reply = st.message({ to = stanza.attr.from, from = module.host })
		:tag("rai", { xmlns = xmlns_rai });
	for room_jid in pairs(rooms_with_activity) do
		reply:text_tag("activity", room_jid);
	end
	return origin.send(reply);
end);

module:hook("muc-broadcast-message", function (event)
	local room, stanza = event.room, event.stanza;
	local archive_id = stanza:get_child("stanza-id", "urn:xmpp:sid:0");
	if archive_id and archive_id.attr.id then
		-- Remember the id of the last message so we can compare it
		-- to the per-user marker (managed by mod_muc_markers)
		update_room_activity(room.jid, archive_id.attr.id);
		-- Notify any users that need to be notified
		notify_interested_users(room.jid);
	end
end, -1);