File

spec/util_async_spec.lua @ 11866:515a89dee6ae

util.startup: Skip config readability check in migrator (thanks eTaurus) This field is empty for reasons when invoked by prosody-migrator, which threw an error: > bad argument #1 to 'open' (string expected, got nil)
author Kim Alvefur <zash@zash.se>
date Sat, 23 Oct 2021 22:24:59 +0200
parent 10541:6c6ff4509082
child 11961:542a9a503073
line wrap: on
line source

local async = require "util.async";

describe("util.async", function()
	local debug = false;
	local print = print;
	if debug then
		require "util.logger".add_simple_sink(print);
	else
		print = function () end
	end

	local function mock_watchers(event_log)
		local function generic_logging_watcher(name)
			return function (...)
				table.insert(event_log, { name = name, n = select("#", ...)-1, select(2, ...) });
			end;
		end;
		return setmetatable(mock{
			ready = generic_logging_watcher("ready");
			waiting = generic_logging_watcher("waiting");
			error = generic_logging_watcher("error");
		}, {
			__index = function (_, event)
				-- Unexpected watcher called
				assert(false, "unexpected watcher called: "..event);
			end;
		})
	end

	local function new(func)
		local event_log = {};
		local spy_func = spy.new(func);
		return async.runner(spy_func, mock_watchers(event_log)), spy_func, event_log;
	end
	describe("#runner", function()
		it("should work", function()
			local r = new(function (item) assert(type(item) == "number") end);
			r:run(1);
			r:run(2);
		end);

		it("should be ready after creation", function ()
			local r = new(function () end);
			assert.equal(r.state, "ready");
		end);

		it("should do nothing if the queue is empty", function ()
			local did_run;
			local r = new(function () did_run = true end);
			r:run();
			assert.equal(r.state, "ready");
			assert.is_nil(did_run);
			r:run("hello");
			assert.is_true(did_run);
		end);

		it("should support queuing work items without running", function ()
			local did_run;
			local r = new(function () did_run = true end);
			r:enqueue("hello");
			assert.equal(r.state, "ready");
			assert.is_nil(did_run);
			r:run();
			assert.is_true(did_run);
		end);

		it("should support queuing multiple work items", function ()
			local last_item;
			local r, s = new(function (item) last_item = item; end);
			r:enqueue("hello");
			r:enqueue("there");
			r:enqueue("world");
			assert.equal(r.state, "ready");
			r:run();
			assert.equal(r.state, "ready");
			assert.spy(s).was.called(3);
			assert.equal(last_item, "world");
		end);

		it("should support all simple data types", function ()
			local last_item;
			local r, s = new(function (item) last_item = item; end);
			local values = { {}, 123, "hello", true, false };
			for i = 1, #values do
				r:enqueue(values[i]);
			end
			assert.equal(r.state, "ready");
			r:run();
			assert.equal(r.state, "ready");
			assert.spy(s).was.called(#values);
			for i = 1, #values do
				assert.spy(s).was.called_with(values[i]);
			end
			assert.equal(last_item, values[#values]);
		end);

		it("should work with no parameters", function ()
			local item = "fail";
			local r = async.runner();
			local f = spy.new(function () item = "success"; end);
			r:run(f);
			assert.spy(f).was.called();
			assert.equal(item, "success");
		end);

		it("supports a default error handler", function ()
			local item = "fail";
			local r = async.runner();
			local f = spy.new(function () error("test error"); end);
			assert.error_matches(function ()
				r:run(f);
			end, "test error");
			assert.spy(f).was.called();
			assert.equal(item, "fail");
		end);

		describe("#errors", function ()
			describe("should notify", function ()
				local last_processed_item, last_error;
				local r;
				r = async.runner(function (item)
					if item == "error" then
						error({ e = "test error" });
					end
					last_processed_item = item;
				end, mock{
					ready = function () end;
					waiting = function () end;
					error = function (runner, err)
						assert.equal(r, runner);
						last_error = err;
					end;
				});

				-- Simple item, no error
				r:run("hello");
				assert.equal(r.state, "ready");
				assert.equal(last_processed_item, "hello");
				assert.spy(r.watchers.ready).was_not.called();
				assert.spy(r.watchers.error).was_not.called();

				-- Trigger an error inside the runner
				assert.equal(last_error, nil);
				r:run("error");
				test("the correct watcher functions", function ()
					-- Only the error watcher should have been called
					assert.spy(r.watchers.ready).was_not.called();
					assert.spy(r.watchers.waiting).was_not.called();
					assert.spy(r.watchers.error).was.called(1);
				end);
				test("with the correct error", function ()
					-- The error watcher state should be correct, to
					-- demonstrate the error was passed correctly
					assert.is_table(last_error);
					assert.equal(last_error.e, "test error");
					last_error = nil;
				end);
				assert.equal(r.state, "ready");
				assert.equal(last_processed_item, "hello");
			end);

			do
				local last_processed_item, last_error;
				local r;
				local wait, done;
				r = async.runner(function (item)
					if item == "error" then
						error({ e = "test error" });
					elseif item == "wait" then
						wait, done = async.waiter();
						wait();
						error({ e = "post wait error" });
					end
					last_processed_item = item;
				end, mock({
					ready = function () end;
					waiting = function () end;
					error = function (runner, err)
						assert.equal(r, runner);
						last_error = err;
					end;
				}));

				randomize(false); --luacheck: ignore 113/randomize

				it("should not be fatal to the runner", function ()
					r:run("world");
					assert.equal(r.state, "ready");
					assert.spy(r.watchers.ready).was_not.called();
					assert.equal(last_processed_item, "world");
				end);
				it("should work despite a #waiter", function ()
					-- This test covers an important case where a runner
					-- throws an error while being executed outside of the
					-- main loop. This happens when it was blocked ('waiting'),
					-- and then released (via a call to done()).
					last_error = nil;
					r:run("wait");
					assert.equal(r.state, "waiting");
					assert.spy(r.watchers.waiting).was.called(1);
					done();
					-- At this point an error happens (state goes error->ready)
					assert.equal(r.state, "ready");
					assert.spy(r.watchers.error).was.called(1);
					assert.spy(r.watchers.ready).was.called(1);
					assert.is_table(last_error);
					assert.equal(last_error.e, "post wait error");
					last_error = nil;
					r:run("hello again");
					assert.spy(r.watchers.ready).was.called(1);
					assert.spy(r.watchers.waiting).was.called(1);
					assert.spy(r.watchers.error).was.called(1);
					assert.equal(r.state, "ready");
					assert.equal(last_processed_item, "hello again");
				end);
			end

			it("should continue to process work items", function ()
				local last_item;
				local runner, runner_func = new(function (item)
					if item == "error" then
						error("test error");
					end
					last_item = item;
				end);
				runner:enqueue("one");
				runner:enqueue("error");
				runner:enqueue("two");
				runner:run();
				assert.equal(runner.state, "ready");
				assert.spy(runner_func).was.called(3);
				assert.spy(runner.watchers.error).was.called(1);
				assert.spy(runner.watchers.ready).was.called(0);
				assert.spy(runner.watchers.waiting).was.called(0);
				assert.equal(last_item, "two");
			end);

			it("should continue to process work items during resume", function ()
				local wait, done, last_item;
				local runner, runner_func = new(function (item)
					if item == "wait-error" then
						wait, done = async.waiter();
						wait();
						error("test error");
					end
					last_item = item;
				end);
				runner:enqueue("one");
				runner:enqueue("wait-error");
				runner:enqueue("two");
				runner:run();
				done();
				assert.equal(runner.state, "ready");
				assert.spy(runner_func).was.called(3);
				assert.spy(runner.watchers.error).was.called(1);
				assert.spy(runner.watchers.waiting).was.called(1);
				assert.spy(runner.watchers.ready).was.called(1);
				assert.equal(last_item, "two");
			end);
		end);
	end);
	describe("#waiter", function()
		it("should error outside of async context", function ()
			assert.has_error(function ()
				async.waiter();
			end);
		end);
		it("should work", function ()
			local wait, done;

			local r = new(function (item)
				assert(type(item) == "number")
				if item == 3 then
					wait, done = async.waiter();
					wait();
				end
			end);

			r:run(1);
			assert(r.state == "ready");
			r:run(2);
			assert(r.state == "ready");
			r:run(3);
			assert(r.state == "waiting");
			done();
			assert(r.state == "ready");
			--for k, v in ipairs(l) do print(k,v) end
		end);

		it("should work", function ()
			--------------------
			local wait, done;
			local last_item = 0;
			local r = new(function (item)
				assert(type(item) == "number")
				assert(item == last_item + 1);
				last_item = item;
				if item == 3 then
					wait, done = async.waiter();
					wait();
				end
			end);

			r:run(1);
			assert(r.state == "ready");
			r:run(2);
			assert(r.state == "ready");
			r:run(3);
			assert(r.state == "waiting");
			r:run(4);
			assert(r.state == "waiting");
			done();
			assert(r.state == "ready");
			--for k, v in ipairs(l) do print(k,v) end
		end);
		it("should work", function ()
			--------------------
			local wait, done;
			local last_item = 0;
			local r = new(function (item)
				assert(type(item) == "number")
				assert((item == last_item + 1) or item == 3);
				last_item = item;
				if item == 3 then
					wait, done = async.waiter();
					wait();
				end
			end);

			r:run(1);
			assert(r.state == "ready");
			r:run(2);
			assert(r.state == "ready");

			r:run(3);
			assert(r.state == "waiting");
			r:run(3);
			assert(r.state == "waiting");
			r:run(3);
			assert(r.state == "waiting");
			r:run(4);
			assert(r.state == "waiting");

			for i = 1, 3 do
				done();
				if i < 3 then
					assert(r.state == "waiting");
				end
			end

			assert(r.state == "ready");
			--for k, v in ipairs(l) do print(k,v) end
		end);
		it("should work", function ()
			--------------------
			local wait, done;
			local last_item = 0;
			local r = new(function (item)
				assert(type(item) == "number")
				assert((item == last_item + 1) or item == 3);
				last_item = item;
				if item == 3 then
					wait, done = async.waiter();
					wait();
				end
			end);

			r:run(1);
			assert(r.state == "ready");
			r:run(2);
			assert(r.state == "ready");

			r:run(3);
			assert(r.state == "waiting");
			r:run(3);
			assert(r.state == "waiting");

			for i = 1, 2 do
				done();
				if i < 2 then
					assert(r.state == "waiting");
				end
			end

			assert(r.state == "ready");
			r:run(4);
			assert(r.state == "ready");

			assert(r.state == "ready");
			--for k, v in ipairs(l) do print(k,v) end
		end);
		it("should work with multiple runners in parallel", function ()
			-- Now with multiple runners
			--------------------
			local wait1, done1;
			local last_item1 = 0;
			local r1 = new(function (item)
				assert(type(item) == "number")
				assert((item == last_item1 + 1) or item == 3);
				last_item1 = item;
				if item == 3 then
					wait1, done1 = async.waiter();
					wait1();
				end
			end, "r1");

			local wait2, done2;
			local last_item2 = 0;
			local r2 = new(function (item)
				assert(type(item) == "number")
				assert((item == last_item2 + 1) or item == 3);
				last_item2 = item;
				if item == 3 then
					wait2, done2 = async.waiter();
					wait2();
				end
			end, "r2");

			r1:run(1);
			assert(r1.state == "ready");
			r1:run(2);
			assert(r1.state == "ready");

			r1:run(3);
			assert(r1.state == "waiting");
			r1:run(3);
			assert(r1.state == "waiting");

			r2:run(1);
			assert(r1.state == "waiting");
			assert(r2.state == "ready");

			r2:run(2);
			assert(r1.state == "waiting");
			assert(r2.state == "ready");

			r2:run(3);
			assert(r1.state == "waiting");
			assert(r2.state == "waiting");
			done2();

			r2:run(3);
			assert(r1.state == "waiting");
			assert(r2.state == "waiting");
			done2();

			r2:run(4);
			assert(r1.state == "waiting");
			assert(r2.state == "ready");

			for i = 1, 2 do
				done1();
				if i < 2 then
					assert(r1.state == "waiting");
				end
			end

			assert(r1.state == "ready");
			r1:run(4);
			assert(r1.state == "ready");

			assert(r1.state == "ready");
			--for k, v in ipairs(l1) do print(k,v) end
		end);
		it("should work work with multiple runners in parallel", function ()
			--------------------
			local wait1, done1;
			local last_item1 = 0;
			local r1 = new(function (item)
				print("r1 processing ", item);
				assert(type(item) == "number")
				assert((item == last_item1 + 1) or item == 3);
				last_item1 = item;
				if item == 3 then
					wait1, done1 = async.waiter();
					wait1();
				end
			end, "r1");

			local wait2, done2;
			local last_item2 = 0;
			local r2 = new(function (item)
				print("r2 processing ", item);
				assert.is_number(item);
				assert((item == last_item2 + 1) or item == 3);
				last_item2 = item;
				if item == 3 then
					wait2, done2 = async.waiter();
					wait2();
				end
			end, "r2");

			r1:run(1);
			assert.equal(r1.state, "ready");
			r1:run(2);
			assert.equal(r1.state, "ready");

			r1:run(5);
			assert.equal(r1.state, "ready");

			r1:run(3);
			assert.equal(r1.state, "waiting");
			r1:run(5); -- Will error, when we get to it
			assert.equal(r1.state, "waiting");
			done1();
			assert.equal(r1.state, "ready");
			r1:run(3);
			assert.equal(r1.state, "waiting");

			r2:run(1);
			assert.equal(r1.state, "waiting");
			assert.equal(r2.state, "ready");

			r2:run(2);
			assert.equal(r1.state, "waiting");
			assert.equal(r2.state, "ready");

			r2:run(3);
			assert.equal(r1.state, "waiting");
			assert.equal(r2.state, "waiting");

			done2();
			assert.equal(r1.state, "waiting");
			assert.equal(r2.state, "ready");

			r2:run(3);
			assert.equal(r1.state, "waiting");
			assert.equal(r2.state, "waiting");

			done2();
			assert.equal(r1.state, "waiting");
			assert.equal(r2.state, "ready");

			r2:run(4);
			assert.equal(r1.state, "waiting");
			assert.equal(r2.state, "ready");

			done1();

			assert.equal(r1.state, "ready");
			r1:run(4);
			assert.equal(r1.state, "ready");

			assert.equal(r1.state, "ready");
		end);

		-- luacheck: ignore 211/rf
		-- FIXME what's rf?
		it("should support multiple done() calls", function ()
			local processed_item;
			local wait, done;
			local r, rf = new(function (item)
				wait, done = async.waiter(4);
				wait();
				processed_item = item;
			end);
			r:run("test");
			for _ = 1, 3 do
				done();
				assert.equal(r.state, "waiting");
				assert.is_nil(processed_item);
			end
			done();
			assert.equal(r.state, "ready");
			assert.equal(processed_item, "test");
			assert.spy(r.watchers.error).was_not.called();
		end);

		it("should not allow done() to be called more than specified", function ()
			local processed_item;
			local wait, done;
			local r, rf = new(function (item)
				wait, done = async.waiter(4);
				wait();
				processed_item = item;
			end);
			r:run("test");
			for _ = 1, 4 do
				done();
			end
			assert.has_error(done);
			assert.equal(r.state, "ready");
			assert.equal(processed_item, "test");
			assert.spy(r.watchers.error).was_not.called();
		end);

		it("should allow done() to be called before wait()", function ()
			local processed_item;
			local r, rf = new(function (item)
				local wait, done = async.waiter();
				done();
				wait();
				processed_item = item;
			end);
			r:run("test");
			assert.equal(processed_item, "test");
			assert.equal(r.state, "ready");
			-- Since the observable state did not change,
			-- the watchers should not have been called
			assert.spy(r.watchers.waiting).was_not.called();
			assert.spy(r.watchers.ready).was_not.called();
		end);
	end);

	describe("#ready()", function ()
		it("should return false outside an async context", function ()
			assert.falsy(async.ready());
		end);
		it("should return true inside an async context", function ()
			local r = new(function ()
				assert.truthy(async.ready());
			end);
			r:run(true);
			assert.spy(r.func).was.called();
			assert.spy(r.watchers.error).was_not.called();
		end);
	end);
end);