У меня есть приложение весенней загрузки, которое потребляет очередь rabbitmq. Приложение работает корректно в течение определенного периода времени, но внезапно перестает принимать сообщения.
Очередь велика (более 2.000.000 сообщений), и приложение распределяется как кластер в средстве ранчера. Иногда некоторые экземпляры работают, другие просто останавливаются.
Я настроил проверку работоспособности, чтобы проверить, разрешено ли потребителю принимать сообщения. Проверка работоспособности подключается к rabbitmq и отвечает количеством сообщений:
27/04/2020 16:05:48status : UP
27/04/2020 16:05:48currentMessageCount : 77372
27/04/2020 16:05:48maxMessageCount : 1000000
27/04/2020 16:05:48currentConsumerCount : 257
27/04/2020 16:05:48minConsumerCount : 1
Это мой класс конфигурации:
@Configuration
@EnableRabbit
@EnableRabbitMetrics
public class RabbitConfiguration {
public static final String QUEUE_GENERIC_NAME = "xxxx";
public static final String EXCHANGE_NAME = "xxxx";
public static final String QUEUE_SPECIFIC_NAME = "xxxx";
public static final String ROUTING_KEY = "xxxx";
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public TopicExchange appExchange() {
return new TopicExchange(EXCHANGE_NAME);
}
@Bean
public RabbitAdmin rabbitAdmin(){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory);
}
@Bean
public Queue appQueueGeneric() {
Queue queue = new Queue(QUEUE_GENERIC_NAME);
queue.setAdminsThatShouldDeclare(rabbitAdmin());
rabbitAdmin().declareQueue(queue);
return queue;
}
@Bean
public Queue appQueueSpecific() {
Queue queue = new Queue(QUEUE_SPECIFIC_NAME);
queue.setAdminsThatShouldDeclare(rabbitAdmin());
rabbitAdmin().declareQueue(queue);
return queue;
}
@Bean
public RabbitQueueCheckHealthIndicator rabbitQueueCheckHealthIndicator() {
RabbitQueueCheckHealthIndicator healthIndicator = new RabbitQueueCheckHealthIndicator();
healthIndicator.addQueueCheck(appQueueSpecific(), 1000000, 1);
return healthIndicator;
}
@Bean
public Binding declareBindingGeneric() {
return BindingBuilder.bind(appQueueGeneric()).to(appExchange()).with(ROUTING_KEY);
}
@Bean
public Binding declareBindingSpecific() {
return BindingBuilder.bind(appQueueSpecific()).to(appExchange()).with(ROUTING_KEY);
}
@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}
@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(consumerJackson2MessageConverter());
return factory;
}
Конфигурация application.yml:
rabbitmq:
queue: msgbox
listener:
simple:
concurrency: 5
max-concurrency: 25
retry:
enabled: true
exchange: (AMQP default)
cache:
channel:
size: 25
connection:
mode: channel
listener:
direct:
prefetch: 100
Потребитель - это класс java, аннотированный @Component, а в методе есть аннотация @RabbitListener. В этом методе я решаю бизнес. Здесь много логи c. XML Проверка схемы, проверка данных, обогащение данных и индекс на эластичном поиске.