Spring boot apllication останавливает потребную очередь rabbitmq - PullRequest
0 голосов
/ 27 апреля 2020

У меня есть приложение весенней загрузки, которое потребляет очередь rabbitmq. Приложение работает корректно в течение определенного периода времени, но внезапно перестает принимать сообщения.

Очередь велика (более 2.000.000 сообщений), и приложение распределяется как кластер в средстве ранчера. Иногда некоторые экземпляры работают, другие просто останавливаются.

Я настроил проверку работоспособности, чтобы проверить, разрешено ли потребителю принимать сообщения. Проверка работоспособности подключается к rabbitmq и отвечает количеством сообщений:

27/04/2020 16:05:48status : UP
27/04/2020 16:05:48currentMessageCount : 77372
27/04/2020 16:05:48maxMessageCount : 1000000
27/04/2020 16:05:48currentConsumerCount : 257
27/04/2020 16:05:48minConsumerCount : 1

Это мой класс конфигурации:

@Configuration
@EnableRabbit
@EnableRabbitMetrics
public class RabbitConfiguration {

    public static final String QUEUE_GENERIC_NAME = "xxxx";
    public static final String EXCHANGE_NAME = "xxxx";
    public static final String QUEUE_SPECIFIC_NAME = "xxxx";
    public static final String ROUTING_KEY = "xxxx";

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public TopicExchange appExchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    public RabbitAdmin rabbitAdmin(){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean
    public Queue appQueueGeneric() {
        Queue queue = new Queue(QUEUE_GENERIC_NAME);
        queue.setAdminsThatShouldDeclare(rabbitAdmin());
        rabbitAdmin().declareQueue(queue);
        return queue;
    }

    @Bean
    public Queue appQueueSpecific() {
        Queue queue = new Queue(QUEUE_SPECIFIC_NAME);
        queue.setAdminsThatShouldDeclare(rabbitAdmin());
        rabbitAdmin().declareQueue(queue);
        return queue;
    }

    @Bean
    public RabbitQueueCheckHealthIndicator rabbitQueueCheckHealthIndicator() {
        RabbitQueueCheckHealthIndicator healthIndicator = new RabbitQueueCheckHealthIndicator();
        healthIndicator.addQueueCheck(appQueueSpecific(), 1000000, 1);
        return healthIndicator;
    }

    @Bean
    public Binding declareBindingGeneric() {
        return BindingBuilder.bind(appQueueGeneric()).to(appExchange()).with(ROUTING_KEY);
    }

    @Bean
    public Binding declareBindingSpecific() {
        return BindingBuilder.bind(appQueueSpecific()).to(appExchange()).with(ROUTING_KEY);
    }

    @Bean
    public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
        return new MappingJackson2MessageConverter();
    }

    @Bean
    public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(consumerJackson2MessageConverter());
        return factory;
    }

Конфигурация application.yml:

    rabbitmq:
        queue: msgbox
        listener:
            simple:
                concurrency: 5
                max-concurrency: 25
                retry:
                    enabled: true
        exchange: (AMQP default)
        cache:
            channel:
                size: 25
            connection:
                mode: channel
        listener:
            direct:
                prefetch: 100    

Потребитель - это класс java, аннотированный @Component, а в методе есть аннотация @RabbitListener. В этом методе я решаю бизнес. Здесь много логи c. XML Проверка схемы, проверка данных, обогащение данных и индекс на эластичном поиске.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...