Как обрабатывать ошибки, когда обмен RabbitMQ не существует (и сообщения отправляются через интерфейс шлюза обмена сообщениями) - PullRequest
1 голос
/ 11 февраля 2020

Я хотел бы знать, каков канонический способ обработки ошибок в следующей ситуации (код является минимальным рабочим примером):

  • Сообщения отправляются через шлюз обмена сообщениями, который определяет его defaultRequestChannel и метод @Gateway:
@MessagingGateway(name = MY_GATEWAY, defaultRequestChannel = INPUT_CHANNEL)
public interface MyGateway
{
  @Gateway
  public void sendMessage(String message);
  • Сообщения считываются с канала и отправляются через исходящий адаптер AMQP:
@Bean
public IntegrationFlow apiMutuaInputFlow()
{
  return IntegrationFlows
    .from(INPUT_CHANNEL)
    .handle(Amqp.outboundAdapter(rabbitConfig.myTemplate()))
    .get();
}
  • Конфигурация RabbitMQ является скелетной:
@Configuration
public class RabbitMqConfiguration
{
    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Bean
    public RabbitTemplate myTemplate()
    {
        RabbitTemplate r = new RabbitTemplate(rabbitConnectionFactory);
        r.setExchange(INPUT_QUEUE_NAME);
        r.setConnectionFactory(rabbitConnectionFactory);
        return r;
    }
}

Обычно я включаю bean-компонент для определения конфигурации RabbitMQ, на которую я полагаюсь (обмен, очереди и привязки), и она на самом деле работает нормально. Но во время тестирования сценария сбоя ios я обнаружил ситуацию, в которой я не знаю, как правильно обращаться с помощью Spring Integration. Шаги:

  • Удалите bean-компоненты, которые настраивают RabbitMQ
  • Запустите поток с ненастроенным, ванильным экземпляром RabbitMQ.

То, что я ожидал бы, это :

  • Сообщение не может быть доставлено, потому что обмен не найден.
  • Либо я найду какой-нибудь способ получить исключение из шлюза обмена сообщениями в потоке вызывающего абонента.
  • Либо я нахожу какой-либо способ перехватить эту ошибку.

Что я нахожу:

  • Сообщение не может быть доставлено, поскольку обмен не может быть найден, и действительно это сообщение об ошибке регистрируется каждый раз, когда вызывается метод @Gateway.
2020-02-11 08:18:40.746 ERROR 42778 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'my.exchange' in vhost '/', class-id=60, method-id=40)
  • Шлюз не выходит из строя, и я не нашел способа настроить его для этого (например: добавляя предложения throws к интерфейсным методам, настраивая транзакционный канал, устанавливая wait-for-confirm и confirm-timeout).
  • Я не нашел способа иначе поймать эту ошибку CachingConectionFactory (например: настройка транзакционного канала).
  • Я не нашел способа перехватить сообщение об ошибке на другом канале (указанном на errorChannel шлюза) или в Spring Integration по умолчанию errorChannel.

I Я понимаю, что такой сбой может не распространяться вверх по потоку через шлюз обмена сообщениями, задачей которого является изоляция вызывающих абонентов от API обмена сообщениями, но я определенно ожидаю, что такая ошибка будет приемлемой.

Не могли бы вы указать мне правильное направление?

Спасибо.

1 Ответ

1 голос
/ 11 февраля 2020

RabbitMQ по своей сути асинхронный c, что является одной из причин, по которой он так хорошо работает.

Однако вы можете заблокировать вызывающего абонента, включив подтверждение и возврат и установив эту опцию:

/**
 * Set to true if you want to block the calling thread until a publisher confirm has
 * been received. Requires a template configured for returns. If a confirm is not
 * received within the confirm timeout or a negative acknowledgment or returned
 * message is received, an exception will be thrown. Does not apply to the gateway
 * since it blocks awaiting the reply.
 * @param waitForConfirm true to block until the confirmation or timeout is received.
 * @since 5.2
 * @see #setConfirmTimeout(long)
 * @see #setMultiSend(boolean)
 */
public void setWaitForConfirm(boolean waitForConfirm) {
    this.waitForConfirm = waitForConfirm;
}

(С DSL .waitForConfirm(true)).

Для этого также требуется выражение подтверждения корреляции. Вот пример из одного из тестовых случаев

    @Bean
    public IntegrationFlow flow(RabbitTemplate template) {
        return f -> f.handle(Amqp.outboundAdapter(template)
                .exchangeName("")
                .routingKeyFunction(msg -> msg.getHeaders().get("rk", String.class))
                .confirmCorrelationFunction(msg -> msg)
                .waitForConfirm(true));
    }

    @Bean
    public CachingConnectionFactory cf() {
        CachingConnectionFactory ccf = new CachingConnectionFactory(
                RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
        ccf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        ccf.setPublisherReturns(true);
        return ccf;
    }

    @Bean
    public RabbitTemplate template(ConnectionFactory cf) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cf);
        rabbitTemplate.setMandatory(true);               // for returns
        rabbitTemplate.setReceiveTimeout(10_000);
        return rabbitTemplate;
    }

Имейте в виду, что это значительно замедлит работу (аналогично использованию транзакций), поэтому вы можете пересмотреть, хотите ли вы делать это при каждой отправке (если только производительность не проблема).

...