Masstransit не создает очередь ошибок для подписчика событий функции Azure - PullRequest
0 голосов
/ 22 февраля 2019

Мы следовали этому примеру (http://masstransit -project.com / MassTransit / использовании / azure-functions.html ), чтобы попытаться настроить функции Azure в качестве подписчиков события (раздела) служебной шины Azure с помощью MassTransit(для .Net CORE 2.1, Функции Azure 2.0).

При использовании веб-заданий Azure это так же просто, как с использованием RabbitMQ, настройте издателя, разрешите подписчику настраивать и настраивать свою очередь, а Masstransit автоматически создает ее.тема для каждого события, перенаправление в очередь и в «queue_error» после неудачной попытки.Вам не нужно ничего настраивать вручную.

Но с помощью функций Azure мы, кажется, вручную (через Service Bus Explorer или шаблоны ARM) должны добавить подписчиков в тему (которая создается издателем при первом опубликованном событии) и очереди также(хотя они даже не кажутся необходимыми, события обрабатываются непосредственно подписчиками темы-функции Azure.).

Возможно, мы делаем что-то не так, из документов я не вижу, что MT не будет, как обычно, настраивать подписчика и создавать очереди при использовании функций Azure.Но это работает, за исключением случаев, когда потребитель генерирует исключение и после того, как все попытки установки были выполнены.Мы просто не получаем событие в очереди недоставленного сообщения, и обычно генерируемая MT очередь ошибок даже не генерируется.

Итак, как мы можем получить MT для создания очередей ошибок и ПЕРЕДАЧИ неудачных событий там?

Наш код:

    [FunctionName("OrderShippedConsumer")]
    public static Task OrderShippedConsumer(
        [ServiceBusTrigger("xyz.events.order/iordershipped", "ordershippedconsumer-queue", Connection = "AzureServiceBus")] Message message, 
        IBinder binder,
        ILogger logger,
        CancellationToken cancellationToken,
        ExecutionContext context)
    {
        var config = CreateConfig(context);

        var handler = Bus.Factory.CreateBrokeredMessageReceiver(binder, cfg =>
        {
            var serviceBusEndpoint = Parse.ConnectionString(config["AzureServiceBus"])["Endpoint"];
            cfg.CancellationToken = cancellationToken;
            cfg.SetLog(logger);
            cfg.InputAddress = new Uri($"{serviceBusEndpoint}{QueueName}");

            cfg.UseRetry(x => x.Intervals(TimeSpan.FromSeconds(5)));
            cfg.Consumer(() => new OrderShippedConsumer(cfg.Log, config));
        });
        return handler.Handle(message);
    }

И код потребителя:

    public OrderShippedConsumer(ILog log, IConfigurationRoot config)
    {
        this.config = config;
        this.log = log;
    }

    public async Task Consume(ConsumeContext<IOrderShipped> context)
    {
       // Handle the event
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...