Comparison

plugins/mod_c2s.lua @ 10811:16bcbd574801

mod_c2s: Run stream open and close events in async thread, fixes #1103 Enables async processing during stream opening and closing.
author Kim Alvefur <zash@zash.se>
date Fri, 08 May 2020 23:58:24 +0200
parent 10727:fa2a89132dfb
child 10849:19e7092e062c
comparison
equal deleted inserted replaced
10810:8a0a923e1ced 10811:16bcbd574801
53 53
54 --- Stream events handlers 54 --- Stream events handlers
55 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; 55 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
56 56
57 function stream_callbacks.streamopened(session, attr) 57 function stream_callbacks.streamopened(session, attr)
58 -- run _streamopened in async context
59 session.thread:run({ stream = "opened", attr = attr });
60 end
61
62 function stream_callbacks._streamopened(session, attr)
58 local send = session.send; 63 local send = session.send;
59 if not attr.to then 64 if not attr.to then
60 session:close{ condition = "improper-addressing", 65 session:close{ condition = "improper-addressing",
61 text = "A 'to' attribute is required on stream headers" }; 66 text = "A 'to' attribute is required on stream headers" };
62 return; 67 return;
119 end 124 end
120 session:close{ condition = "undefined-condition", text = "No stream features to proceed with" }; 125 session:close{ condition = "undefined-condition", text = "No stream features to proceed with" };
121 end 126 end
122 end 127 end
123 128
124 function stream_callbacks.streamclosed(session) 129 function stream_callbacks.streamclosed(session, attr)
130 -- run _streamclosed in async context
131 session.thread:run({ stream = "closed", attr = attr });
132 end
133
134 function stream_callbacks._streamclosed(session)
125 session.log("debug", "Received </stream:stream>"); 135 session.log("debug", "Received </stream:stream>");
126 session:close(false); 136 session:close(false);
127 end 137 end
128 138
129 function stream_callbacks.error(session, error, data) 139 function stream_callbacks.error(session, error, data)
278 session.notopen = true; 288 session.notopen = true;
279 session.stream:reset(); 289 session.stream:reset();
280 end 290 end
281 291
282 session.thread = runner(function (stanza) 292 session.thread = runner(function (stanza)
283 core_process_stanza(session, stanza); 293 if st.is_stanza(stanza) then
294 core_process_stanza(session, stanza);
295 elseif stanza.stream == "opened" then
296 stream_callbacks._streamopened(session, stanza.attr);
297 elseif stanza.stream == "closed" then
298 stream_callbacks._streamclosed(session, stanza.attr);
299 end
284 end, runner_callbacks, session); 300 end, runner_callbacks, session);
285 301
286 local filter = session.filter; 302 local filter = session.filter;
287 function session.data(data) 303 function session.data(data)
288 -- Parse the data, which will store stanzas in session.pending_stanzas 304 -- Parse the data, which will store stanzas in session.pending_stanzas