Подозреваемая проблема метода расширения ConnectHandler в MassTransit 6.2.3 - PullRequest
2 голосов
/ 31 марта 2020

Когда я пытался обновить MassTransit пакет с версии 3.2.4 до версии 6.2.3 в моем проекте, некоторый существующий код перестал работать. Я использую расширение ConnectHandler для подключения обработчиков сообщений к шине после того, как шина уже создана и настроена . Раньше он работал нормально, но, похоже, больше не работает. Обработчик, зарегистрированный с этим расширением, просто не запускается. Альтернативный способ регистрации обработчика с помощью ReceiveEndpoint во время настройки шины, по-прежнему, работает нормально с обеими версиями MassTransit. Вот некоторый упрощенный код:

    [TestFixture]
    public class When_MassTransit_handler_should_consume_published_message
    {
        [Test]
        public void This_works_fine_with_MassTransit_3_2_4_but_never_enters_handler_with_MassTransit_6_2_3()
        {
            var someEvent = new SomeEvent { Data = 123 };

            var tcs = new TaskCompletionSource<ConsumeContext<SomeEvent>>();

            var bus = Bus.Factory.CreateUsingInMemory(config => { });

            bus.Start();

            bus.ConnectHandler<SomeEvent>(c => Task.Run(() => tcs.SetResult(c)));

            bus.Publish(someEvent, someEvent.GetType(), context => { }).Wait();

            if (!tcs.Task.Wait(5000))
                Assert.Fail("Event consuming takes too long (> 5000 ms)"); // fails here for 6.2.3 because the handler never fires

            Assert.That(someEvent.Data, Is.EqualTo(tcs.Task.Result.Message.Data));

            bus.Stop();
        }

        [Test]
        public void This_works_fine_with_both_MassTransit_versions()
        {
            var someEvent = new SomeEvent { Data = 123 };

            var tcs = new TaskCompletionSource<ConsumeContext<SomeEvent>>();

            var bus = Bus.Factory.CreateUsingInMemory(config => 
            {
                config.ReceiveEndpoint("input_queue", endpoint =>
                {
                    endpoint.Handler<SomeEvent>(c => Task.Run(() => tcs.SetResult(c)));
                });
            });

            bus.Start();

            bus.Publish(someEvent, someEvent.GetType(), context => { }).Wait();

            if (!tcs.Task.Wait(5000))
                Assert.Fail("Event consuming takes too long (> 5000 ms)");

            Assert.That(someEvent.Data, Is.EqualTo(tcs.Task.Result.Message.Data));

            bus.Stop();
        }

        class SomeEvent
        {
            public int Data;
        }
    }

Произошло ли какое-либо критическое изменение в MassTransit, вызвавшее такое поведение? Это действительно проблема с методом расширения MassTransit или мой код устарел и должен быть скорректирован в соответствии с самой последней версией? Какие корректировки необходимы? Есть ли альтернативный способ присоединить \ отсоединить обработчик к шине после того, как шина уже создана и настроена?

1 Ответ

1 голос
/ 01 апреля 2020

Краткий ответ? Да, транспорт в памяти был изменен так, чтобы он вел себя больше как RabbitMQ. Фактически, он был полностью переписан так, чтобы вести себя так же, как обмены и очереди RabbitMQ. Таким образом, в случае ConnectHandler и любого из методов ConnectXxx он больше не создает привязку к обмену сообщениями для типа сообщения обработчика. Транспорт в памяти также был изменен, так что он не может совместно использоваться несколькими экземплярами шины, каждая шина, созданная с использованием транспорта в памяти, имеет свое собственное сообщение fabri c.

Чтобы соответствовать предыдущему поведению, используйте вместо этого следующий код:

await bus.ConnectReceiveEndpoint("queue-name", x =>
{
    x.Handler<SomeEvent>(...);
});

Для сообщений, отправляемых непосредственно на адрес шины, ConnectHandler работает нормально. Только опубликованные сообщения не будут поступать, так как биржи publi sh больше не привязаны к очереди получения конечной точки шины.

...