Reactor rabbitmq AlreadyClosedException - PullRequest
       18

Reactor rabbitmq AlreadyClosedException

0 голосов
/ 05 февраля 2019

В моем проекте я использую Springboot версии 2.1.2.RELEASE, реактор-rabbitmq версии 1.0.0.RELEASE.Я создаю кролика, подписываюсь и обрабатываю сообщения вручную.Но через какое-то время это может быть через час или после 1-2 дней работы, я получаю ошибку пропуска сердцебиения, после чего канал закрывается, и я получаю "com.rabbitmq.client.AlreadyClosedException: соединение уже закрыто из-за ошибки соединения; причина: java.io.IOException: "Сброс соединения по пиру", и мой получатель больше не получает сообщения.Работает только после перезапуска.

Клиент Кролик имеет connectionFactory.setAutomaticRecoveryEnabled (true);и connectionFactory.setTopologyRecoveryEnabled (true);

, поэтому он должен автоматически восстанавливаться по умолчанию, но это не работает.

public void startReceiver(int parallelism) {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.useNio();
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setVirtualHost("/");
    connectionFactory.setRequestedHeartbeat(10);

    Address[] addresses = {new Address("localhost")};
    ReceiverOptions receiverOptions = new ReceiverOptions()
            .connectionFactory(connectionFactory)
            .connectionSupplier(cf -> cf.newConnection(addresses, "receiver"))
            .connectionSubscriptionScheduler(Schedulers.elastic());

    Receiver receiver = RabbitFlux.createReceiver(receiverOptions);
    receiver.consumeManualAck("test-data", new ConsumeOptions().qos(200))
    .doOnSubscribe(s -> System.out.println("Receiver started."))
    .retry()
    .parallel(parallelism)
    .runOn(Schedulers.newParallel("parallel-receiver", parallelism))
    .doOnNext(d -> processMessage(d))
    .subscribe();
}

private void processMessage(AcknowledgableDelivery message) {
    try {
        //some processing
    } catch (Exception e) {
        e.printStackTrace();
    }
    message.ack();
}

Я получаю ошибки

com.rabbitmq.client.], на com.rabbitmq.client.impl.nio.NioLoop.lambda $ handleHeartbeatFailure $ 0 (NioLoop.java:273) [amqp-client-5.5.1.jar! /: 5.5.1], на java.lang.Thread.run (Thread.java:748) ~ [na: 1.8.0_181],

реактор.rabbitmq.Receiver: Отмена потребителя amq.ctag - x02OWhVo3_DPutsPQ0qDw, получающего данные теста,

pipeline.core.Exceptions $ ErrorCallbackNotImplemented: com.rabbitmq.client.AlreadyClosedException: соединение уже закрыто из-за ошибки соединения;Причина: java.io.IOException: Сброс соединения по одноранговому узлу. Причина: com.rabbitmq.client.AlreadyClosedException: соединение уже закрыто из-за ошибки соединения;причина: java.io.IOException: сброс соединения одноранговым узлом в com.rabbitmq.client.impl.AMQChannel.ensureIsOpen (AMQChannel.java:257) ~ [amqp-client-5.5.1.jar! /: 5.5.1]на com.rabbitmq.client.impl.AMQChannel.transmit (AMQChannel.java:426) ~ [amqp-client-5.5.1.jar! /: 5.5.1], на com.rabbitmq.client.impl.AMQChannel.передать (AMQChannel.java:420) ~ [amqp-client-5.5.1.jar! /: 5.5.1] по адресу com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck (RecoveryAwareChannelN.java:93) ~ [amqp-client-5.5.1.jar! /: 5.5.1], по адресу com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck (AutorecoveringChannel.java:428) ~ [amqp-client-5.5.1.jar!/:5.5.1], в реакторе.rabbitmq.AcknowledgableDelivery.ack (AcknowledgableDelivery.java:56) ~ [реактор-rabbitmq-1.0.0.RELEASE.jar! /: 1.0.0.RELEASE], в реакторе.rabbitmqAcknowledgableDelivery.ack (AcknowledgableDelivery.java:73) ~ [реактор-кролик-1.0.0.RELEASE.jar! /: 1.0.0.RELEASE],

...