Я пытаюсь использовать Azure Темы служебной шины, чтобы разрешить службам получать сообщения с помощью MassTransit. Сообщение отправляется в топи c, и каждая служба, имеющая подписку на топи c, получает копию сообщения. Каждый тип сообщения получает topi c.
Проблема в том, как поступать с ошибками после обращения к источнику исключения. Как только сообщение выходит из строя и попадает в очередь _error, я хотел бы вернуть сообщение для обработки после исправления сообщения и / или службы. Я не могу переместить сообщение из очереди _error в Topi c, потому что каждая служба в этом topi c снова получит сообщение.
Я попытался создать вторую очередь с помощью метода ReceiveEndpoint под названием _errorecovery, но это привело к тому, что очередь подписалась на topi c, что означает, что очередь _errorrecovery получает каждое сообщение, опубликованное для этого topi c.
Мне интересно, есть ли способ, которым я могу настроить очередь используя MassTransit, который будет обрабатывать только сообщения в этой очереди без добавления дополнительной подписки.
Вот моя текущая настройка для создания тем.
TEvent - это тип сообщения, а TConsumer - связанная реализация IConsumer для этого типа сообщения.
public void ConfigureType<TEvent, TConsumer>(IServiceBusBusFactoryConfigurator busConfig, Container container, MessageHandlingOptions options) where TConsumer : class, IConsumer
{
string subName = NameHelper.GetSubscriptionName(@namespace, _serviceName);
var topicName = NameHelper.GetTopicName(@namespace, typeof(TEvent));
busConfig.SubscriptionEndpoint(subName, topicName, configurator =>
{
configurator.ConfigureConsumer(container, typeof(TConsumer));
if (!(options is null))
{
ConfigureRetry(configurator, options);
}
});
}
И для создания очереди _errorrecovery. Каждое событие также имеет связанный IConsumer специально для обработки сбойных событий.
var subName = NameHelper.GetSubscriptionName(@namespace, _serviceName);
busConfig.ReceiveEndpoint(subName + "_errorrecovery", config =>
{
config.ConfigureConsumer(_simpleContainer, faultConsumers.Select(i => i.GenericType).ToArray());
});
Это создает очередь с именем subname_errorrecovery и Topi c, названную в честь события. У сервиса есть подписка в Topi c, но есть и _errorrecovery. Таким образом, каждый раз, когда сообщение отправляется в Topi c, потребители события и получатели ошибки получают сообщения.
Итак, я ищу способ подключить службу к очередь восстановления, а также несколько тем без этой очереди, а также подписка на все топи c.
Также очень возможно, что я поступаю неправильно. Возможно, мне нужно просто добавить проверку на наличие повторяющихся сообщений и просто переместить сообщение в topi c, позволяя службам, которые изначально успешно обработали сообщение, просто игнорировать его. Это то, что я планирую сделать в любом случае, но я надеюсь получить некоторые рекомендации по обработке ошибок с помощью MassTransit.
Любая помощь будет принята с благодарностью.