В моем проекте я использую Springboot версии 2.1.2.RELEASE, реактор-rabbitmq версии 1.0.0.RELEASE.Я создаю кролика, подписываюсь и обрабатываю сообщения вручную.Но через какое-то время это может быть через час или после 1-2 дней работы, я получаю ошибку пропуска сердцебиения, после чего канал закрывается, и я получаю "com.rabbitmq.client.AlreadyClosedException: соединение уже закрыто из-за ошибки соединения; причина: java.io.IOException: "Сброс соединения по пиру", и мой получатель больше не получает сообщения.Работает только после перезапуска.
Клиент Кролик имеет connectionFactory.setAutomaticRecoveryEnabled (true);и connectionFactory.setTopologyRecoveryEnabled (true);
, поэтому он должен автоматически восстанавливаться по умолчанию, но это не работает.
public void startReceiver(int parallelism) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setRequestedHeartbeat(10);
Address[] addresses = {new Address("localhost")};
ReceiverOptions receiverOptions = new ReceiverOptions()
.connectionFactory(connectionFactory)
.connectionSupplier(cf -> cf.newConnection(addresses, "receiver"))
.connectionSubscriptionScheduler(Schedulers.elastic());
Receiver receiver = RabbitFlux.createReceiver(receiverOptions);
receiver.consumeManualAck("test-data", new ConsumeOptions().qos(200))
.doOnSubscribe(s -> System.out.println("Receiver started."))
.retry()
.parallel(parallelism)
.runOn(Schedulers.newParallel("parallel-receiver", parallelism))
.doOnNext(d -> processMessage(d))
.subscribe();
}
private void processMessage(AcknowledgableDelivery message) {
try {
//some processing
} catch (Exception e) {
e.printStackTrace();
}
message.ack();
}
Я получаю ошибки
com.rabbitmq.client.], на com.rabbitmq.client.impl.nio.NioLoop.lambda $ handleHeartbeatFailure $ 0 (NioLoop.java:273) [amqp-client-5.5.1.jar! /: 5.5.1], на java.lang.Thread.run (Thread.java:748) ~ [na: 1.8.0_181],
реактор.rabbitmq.Receiver: Отмена потребителя amq.ctag - x02OWhVo3_DPutsPQ0qDw, получающего данные теста,
pipeline.core.Exceptions $ ErrorCallbackNotImplemented: com.rabbitmq.client.AlreadyClosedException: соединение уже закрыто из-за ошибки соединения;Причина: java.io.IOException: Сброс соединения по одноранговому узлу. Причина: com.rabbitmq.client.AlreadyClosedException: соединение уже закрыто из-за ошибки соединения;причина: java.io.IOException: сброс соединения одноранговым узлом в com.rabbitmq.client.impl.AMQChannel.ensureIsOpen (AMQChannel.java:257) ~ [amqp-client-5.5.1.jar! /: 5.5.1]на com.rabbitmq.client.impl.AMQChannel.transmit (AMQChannel.java:426) ~ [amqp-client-5.5.1.jar! /: 5.5.1], на com.rabbitmq.client.impl.AMQChannel.передать (AMQChannel.java:420) ~ [amqp-client-5.5.1.jar! /: 5.5.1] по адресу com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck (RecoveryAwareChannelN.java:93) ~ [amqp-client-5.5.1.jar! /: 5.5.1], по адресу com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck (AutorecoveringChannel.java:428) ~ [amqp-client-5.5.1.jar!/:5.5.1], в реакторе.rabbitmq.AcknowledgableDelivery.ack (AcknowledgableDelivery.java:56) ~ [реактор-rabbitmq-1.0.0.RELEASE.jar! /: 1.0.0.RELEASE], в реакторе.rabbitmqAcknowledgableDelivery.ack (AcknowledgableDelivery.java:73) ~ [реактор-кролик-1.0.0.RELEASE.jar! /: 1.0.0.RELEASE],