Comparison

plugins/mod_debug_stanzas/watcher.lib.lua @ 12463:788048158982

mod_debug_stanzas/watcher: New module library to dynamically 'watch' for stanzas
author Matthew Wild <mwild1@gmail.com>
date Wed, 23 Mar 2022 13:42:44 +0000 (2022-03-23)
child 12977:74b9e05af71e
comparison
equal deleted inserted replaced
12462:11765f0605ec 12463:788048158982
1 local filters = require "util.filters";
2 local jid = require "util.jid";
3 local set = require "util.set";
4
5 local client_watchers = {};
6
7 -- active_filters[session] = {
8 -- filter_func = filter_func;
9 -- downstream = { cb1, cb2, ... };
10 -- }
11 local active_filters = {};
12
13 local function subscribe_session_stanzas(session, handler, reason)
14 if active_filters[session] then
15 table.insert(active_filters[session].downstream, handler);
16 if reason then
17 handler(reason, nil, session);
18 end
19 return;
20 end
21 local downstream = { handler };
22 active_filters[session] = {
23 filter_in = function (stanza)
24 module:log("debug", "NOTIFY WATCHER %d", #downstream);
25 for i = 1, #downstream do
26 downstream[i]("received", stanza, session);
27 end
28 return stanza;
29 end;
30 filter_out = function (stanza)
31 module:log("debug", "NOTIFY WATCHER %d", #downstream);
32 for i = 1, #downstream do
33 downstream[i]("sent", stanza, session);
34 end
35 return stanza;
36 end;
37 downstream = downstream;
38 };
39 filters.add_filter(session, "stanzas/in", active_filters[session].filter_in);
40 filters.add_filter(session, "stanzas/out", active_filters[session].filter_out);
41 if reason then
42 handler(reason, nil, session);
43 end
44 end
45
46 local function unsubscribe_session_stanzas(session, handler, reason)
47 local active_filter = active_filters[session];
48 if not active_filter then
49 return;
50 end
51 for i = #active_filter.downstream, 1, -1 do
52 if active_filter.downstream[i] == handler then
53 table.remove(active_filter.downstream, i);
54 if reason then
55 handler(reason, nil, session);
56 end
57 end
58 end
59 if #active_filter.downstream == 0 then
60 filters.remove_filter(session, "stanzas/in", active_filter.filter_in);
61 filters.remove_filter(session, "stanzas/out", active_filter.filter_out);
62 end
63 active_filters[session] = nil;
64 end
65
66 local function unsubscribe_all_from_session(session, reason)
67 local active_filter = active_filters[session];
68 if not active_filter then
69 return;
70 end
71 for i = #active_filter.downstream, 1, -1 do
72 local handler = table.remove(active_filter.downstream, i);
73 if reason then
74 handler(reason, nil, session);
75 end
76 end
77 filters.remove_filter(session, "stanzas/in", active_filter.filter_in);
78 filters.remove_filter(session, "stanzas/out", active_filter.filter_out);
79 active_filters[session] = nil;
80 end
81
82 local function unsubscribe_handler_from_all(handler, reason)
83 for session in pairs(active_filters) do
84 unsubscribe_session_stanzas(session, handler, reason);
85 end
86 end
87
88 local s2s_watchers = {};
89
90 module:hook("s2sin-established", function (event)
91 for _, watcher in ipairs(s2s_watchers) do
92 if watcher.target_spec == event.session.from_host then
93 subscribe_session_stanzas(event.session, watcher.handler, "opened");
94 end
95 end
96 end);
97
98 module:hook("s2sout-established", function (event)
99 for _, watcher in ipairs(s2s_watchers) do
100 if watcher.target_spec == event.session.to_host then
101 subscribe_session_stanzas(event.session, watcher.handler, "opened");
102 end
103 end
104 end);
105
106 module:hook("s2s-closed", function (event)
107 unsubscribe_all_from_session(event.session, "closed");
108 end);
109
110 local watched_hosts = set.new();
111
112 local handler_map = setmetatable({}, { __mode = "kv" });
113
114 local function add_stanza_watcher(spec, orig_handler)
115 local function filtering_handler(event_type, stanza, session)
116 if stanza and spec.filter_spec then
117 if spec.filter_spec.with_jid then
118 if event_type == "sent" and (not stanza.attr.from or not jid.compare(stanza.attr.from, spec.filter_spec.with_jid)) then
119 return;
120 elseif event_type == "received" and (not stanza.attr.to or not jid.compare(stanza.attr.to, spec.filter_spec.with_jid)) then
121 return;
122 end
123 end
124 end
125 return orig_handler(event_type, stanza, session);
126 end
127 handler_map[orig_handler] = filtering_handler;
128 if spec.target_spec.jid then
129 local target_is_remote_host = not jid.node(spec.target_spec.jid) and not prosody.hosts[spec.target_spec.jid];
130
131 if target_is_remote_host then
132 -- Watch s2s sessions
133 table.insert(s2s_watchers, {
134 target_spec = spec.target_spec.jid;
135 handler = filtering_handler;
136 orig_handler = orig_handler;
137 });
138
139 -- Scan existing s2sin for matches
140 for session in pairs(prosody.incoming_s2s) do
141 if spec.target_spec.jid == session.from_host then
142 subscribe_session_stanzas(session, filtering_handler, "attached");
143 end
144 end
145 -- Scan existing s2sout for matches
146 for local_host, local_session in pairs(prosody.hosts) do --luacheck: ignore 213/local_host
147 for remote_host, remote_session in pairs(local_session.s2sout) do
148 if spec.target_spec.jid == remote_host then
149 subscribe_session_stanzas(remote_session, filtering_handler, "attached");
150 end
151 end
152 end
153 else
154 table.insert(client_watchers, {
155 target_spec = spec.target_spec.jid;
156 handler = filtering_handler;
157 orig_handler = orig_handler;
158 });
159 local host = jid.host(spec.target_spec.jid);
160 if not watched_hosts:contains(host) and prosody.hosts[host] then
161 module:context(host):hook("resource-bind", function (event)
162 for _, watcher in ipairs(client_watchers) do
163 module:log("debug", "NEW CLIENT: %s vs %s", event.session.full_jid, watcher.target_spec);
164 if jid.compare(event.session.full_jid, watcher.target_spec) then
165 module:log("debug", "MATCH");
166 subscribe_session_stanzas(event.session, watcher.handler, "opened");
167 else
168 module:log("debug", "NO MATCH");
169 end
170 end
171 end);
172
173 module:context(host):hook("resource-unbind", function (event)
174 unsubscribe_all_from_session(event.session, "closed");
175 end);
176
177 watched_hosts:add(host);
178 end
179 for full_jid, session in pairs(prosody.full_sessions) do
180 if jid.compare(full_jid, spec.target_spec.jid) then
181 subscribe_session_stanzas(session, filtering_handler, "attached");
182 end
183 end
184 end
185 else
186 error("No recognized target selector");
187 end
188 end
189
190 local function remove_stanza_watcher(orig_handler)
191 local handler = handler_map[orig_handler];
192 unsubscribe_handler_from_all(handler, "detached");
193 handler_map[orig_handler] = nil;
194
195 for i = #client_watchers, 1, -1 do
196 if client_watchers[i].orig_handler == orig_handler then
197 table.remove(client_watchers, i);
198 end
199 end
200
201 for i = #s2s_watchers, 1, -1 do
202 if s2s_watchers[i].orig_handler == orig_handler then
203 table.remove(s2s_watchers, i);
204 end
205 end
206 end
207
208 local function cleanup(reason)
209 client_watchers = {};
210 s2s_watchers = {};
211 for session in pairs(active_filters) do
212 unsubscribe_all_from_session(session, reason or "cancelled");
213 end
214 end
215
216 return {
217 add = add_stanza_watcher;
218 remove = remove_stanza_watcher;
219 cleanup = cleanup;
220 };