Я пытаюсь настроить систему, которая будет читать две разные темы Кафки - одну для живых сообщений и одну для массовых сообщений. Надежда состоит в том, что независимо от того, сколько сообщений в «массовой» теме, все в «живой» теме отдается предпочтение.
кажется , что Spring Kafka делает это из коробки - иногда.
То, что у меня есть, просто:
@KafkaListener(topics = {"sync-live", "sync-bulk"}, concurrency = "1")
И моя конфигурация:
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
И иногда это будет делать именно то, что я хочу, а иногда - нет. Фактически, пару раз я видел, как он переключается с «живой» темы на «массовую», когда еще есть живые сообщения для обработки!
Есть ли способ заставить Spring Kafka читать только из второй темы, когда первая пуста?
Приветствия