Rabbit MQ Connection получает сброс для долго работающего сообщения и входит в бесконечный цикл - PullRequest
0 голосов
/ 27 января 2020

Я использую RabbitMQ в приложении Spring Boot. Ниже моя конфигурация. При обработке сообщения, которое занимает много времени (несколько минут), соединение Rabbit MQ сбрасывается и становится ниже исключительной ситуации, после чего снова создается новое соединение и снова получается процесс получения того же сообщения. Эти действия продолжаются, и сервер загружается обработкой того же сообщения в бесконечный l oop. Эта проблема никогда не возникает, когда обработка сообщений занимает меньше времени.

Сведения об исключении:

1.[Message processing logs]

2.[AMQP Connection X.X.X.X:5672] com.rabbitmq.client.impl.ForgivingExceptionHandler:An unexpected connection driver error occured (Exception message: Connection reset)
2020-01-27 14:15:37  ERROR [AMQP Connection X.X.X.X:5672] org.springframework.amqp.rabbit.connection.CachingConnectionFactory:Channel shutdown: connection error
2020-01-27 14:15:37 ERROR [AMQP Connection X.X.X.X:5672]  org.springframework.amqp.rabbit.connection.CachingConnectionFactory:Channel shutdown: connection error

 3. [container-4]org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer:Restarting Consumer@4ce2ba84: tags=[{amq.ctag-D3cAGHJSHJJuusHJ=test-queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://testuser@X.X.X.X:567/vhost1,1), conn: Proxy@5ef6bf2 Shared Rabbit Connection: SimpleConnection@319aa07f [delegate=amqp://testuser@X.X.X.X:567/vhost1, localPort= 39626], acknowledgeMode=AUTO local queue size=0
2020-01-27 14:15:37 INFO  [container-5] org.springframework.amqp.rabbit.connection.CachingConnectionFactory:Created new connection: rabbitConnectionFactory#4517b9ea:4/SimpleConnection@55e54aa2 [delegate=amqp://testuser@X.X.X.X:567/vhost1, localPort= 39684]
2020-01-27 14:15:37 INFO  [container-5]

 4. Same message getting processed again


Вот моя конфигурация загрузки Spring:


    @Bean
    Queue queue() {
        return new Queue(Constant.QUEUE_NAME, true);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(Constant.EXCHANGE);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(Constant.ROUTING_KEY);
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                             MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(Constant.QUEUE_NAME);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(ReceiverService receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

Конфигурация Rabbit MQ:

rabbitmq.host=X.X.X.X
rabbitmq.port=5672
rabbitmq.username=testuser
rabbitmq.password=testpass
rabbitmq.virtual.host=vhost1
rabbitmq.concurrent.consumer=8
rabbitmq.prefetch.count=8

В чем проблема с текущей конфигурацией?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...