Перемещение сообщений из очереди _error при использовании Azure Темы служебной шины - PullRequest
0 голосов
/ 09 июля 2020

Я пытаюсь использовать Azure Темы служебной шины, чтобы разрешить службам получать сообщения с помощью MassTransit. Сообщение отправляется в топи c, и каждая служба, имеющая подписку на топи c, получает копию сообщения. Каждый тип сообщения получает topi c.

Проблема в том, как поступать с ошибками после обращения к источнику исключения. Как только сообщение выходит из строя и попадает в очередь _error, я хотел бы вернуть сообщение для обработки после исправления сообщения и / или службы. Я не могу переместить сообщение из очереди _error в Topi c, потому что каждая служба в этом topi c снова получит сообщение.

Я попытался создать вторую очередь с помощью метода ReceiveEndpoint под названием _errorecovery, но это привело к тому, что очередь подписалась на topi c, что означает, что очередь _errorrecovery получает каждое сообщение, опубликованное для этого topi c.

Мне интересно, есть ли способ, которым я могу настроить очередь используя MassTransit, который будет обрабатывать только сообщения в этой очереди без добавления дополнительной подписки.

Вот моя текущая настройка для создания тем.

TEvent - это тип сообщения, а TConsumer - связанная реализация IConsumer для этого типа сообщения.

  public void ConfigureType<TEvent, TConsumer>(IServiceBusBusFactoryConfigurator busConfig, Container container, MessageHandlingOptions options) where TConsumer : class, IConsumer
        {
            string subName = NameHelper.GetSubscriptionName(@namespace, _serviceName);
            var topicName = NameHelper.GetTopicName(@namespace, typeof(TEvent));

            busConfig.SubscriptionEndpoint(subName, topicName, configurator =>
            {
                configurator.ConfigureConsumer(container, typeof(TConsumer));
                if (!(options is null))
                {
                    ConfigureRetry(configurator, options);
                }
            });
        }

И для создания очереди _errorrecovery. Каждое событие также имеет связанный IConsumer специально для обработки сбойных событий.

 var subName = NameHelper.GetSubscriptionName(@namespace, _serviceName);

                        busConfig.ReceiveEndpoint(subName + "_errorrecovery", config =>
                        {
                            config.ConfigureConsumer(_simpleContainer, faultConsumers.Select(i => i.GenericType).ToArray());
                        });

Это создает очередь с именем subname_errorrecovery и Topi c, названную в честь события. У сервиса есть подписка в Topi c, но есть и _errorrecovery. Таким образом, каждый раз, когда сообщение отправляется в Topi c, потребители события и получатели ошибки получают сообщения.

Итак, я ищу способ подключить службу к очередь восстановления, а также несколько тем без этой очереди, а также подписка на все топи c.

Также очень возможно, что я поступаю неправильно. Возможно, мне нужно просто добавить проверку на наличие повторяющихся сообщений и просто переместить сообщение в topi c, позволяя службам, которые изначально успешно обработали сообщение, просто игнорировать его. Это то, что я планирую сделать в любом случае, но я надеюсь получить некоторые рекомендации по обработке ошибок с помощью MassTransit.

Любая помощь будет принята с благодарностью.

Ответы [ 2 ]

0 голосов
/ 11 июля 2020

Стандартным решением этой проблемы является добавление правила подписки topi c. Таким образом, всякий раз, когда вы хотите повторно отправить сообщения, в эти подписки будет добавлено только go на основе фильтра, примененного к подписке. Узнайте о правилах подписки Topi c в статье здесь .

0 голосов
/ 11 июля 2020

Azure Service Bus позволяет вам настроить правила фильтрации подписок для каждой подписки, которую вы создаете для topi c (см. https://docs.microsoft.com/en-us/azure/service-bus-messaging/topic-filters).

С его помощью вы можете определить правила фильтрации для подписки, чтобы только сообщения, соответствующие указанным c правилам, помещались в очередь этого подписчика для обработки.

Эти правила фильтрации могут быть настроены из кода , с помощью шаблонов ARM, портала Azure или Azure CLI.

Что вы можете сделать, так это убедиться, что к сообщению добавлено определенное свойство сообщения , если обработка не удалось, прежде чем он был перемещен в очередь _error . Я не очень хорошо знаком с MassTransit, но думаю, что вариант будет. Или, может быть, сам MassTransit уже предоставляет такую ​​информацию о происхождении, чтобы вы могли определить по установленному свойству сообщения, в каком источнике подписки (очереди) сообщение было обработано до того, как оно было перемещено в очередь _error.

Итак, давайте рассмотрим когда ваше неудавшееся сообщение (возможно, оно не удалось из-за исключения из-за того, что база данных службы потребителей временно недоступна) перемещается в очередь _error с некоторым свойством сообщения, указывающим, где оно было обработано. Назовем это свойство « ErrorOrigin », и вы дадите ему уникальное значение, идентифицирующее определенную подписку.

Итак, в вашем случае вы можете определить правило фильтрации для каждой подписки так что только сообщения, которые либо не имеют свойства с именем « ErrorOrigin », либо, если свойство установлено, его значение должно соответствовать имени идентификатора, соответствующему этой подписке (например, « subscriptionXforEventZ")

Если вы правильно настроили правила фильтрации подписки, потребители подписки теперь будут обрабатывать все сообщения, изначально опубликованные в topi c, а также сообщения из очереди _error, которые также были опубликованы в тот топи c. Но с правилом фильтрации Azure служебная шина будет помещать эти неудачные сообщения в соответствующую очередь подписки только в том случае, если свойство сообщения « ErrorOrigin » соответствует этой подписке.

Я успешно использовал эту функцию на практике в архитектуре микросервисов. Это позволяет очень легко добавлять дополнительных подписчиков в топи c, но дает этим подписчикам возможность получать только сообщения от топи c, которые представляют интерес.

Здесь вы можете увидеть пример фильтрации подписки на Azure Service Bus, чтобы лучше понять, как это работает: https://github.com/Azure/azure-service-bus/tree/master/samples/DotNet/Microsoft.ServiceBus.Messaging/TopicFilters

Надеюсь, эта идея поможет вам решить вашу проблему c.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...