У меня есть две разные темы для чтения и публикации обработанных данных в веб-сервис. У меня есть условие, я должен полностью прочитать сообщения из theme1 и убедиться, что нет сообщений из theme1, я должен прочитать сообщения из topic2 и обработать его. В случае, если я начинаю читать сообщения из theme2 и получаю сообщения из topic1, мне приходится приостанавливать обработку сообщений из topic2 и читать сообщения из topic1.
Мне как-то удалось это сделать с помощью KafkaListenerEndpointRegistry. Код ListnerConfig
@Bean(kafkaListenerContainerTopic1Factory)
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerTopic1Factory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setIdleEventInterval(60000L);
factory.setBatchListener(true);
return factory;
}
@Bean("kafkaListenerContainerTopic2Factory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerTopic2Factory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
Код Listner
@KafkaListener(id = "first-listener", topics = "topic1", containerFactory = "kafkaListenerContainerTopic1Factory")
public void receive(@Payload List<String> messages,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
for (int i = 0; i < messages.size(); i++) {
LOG.info("received first='{}' with partition-offset='{}'",
messages.get(i), partitions.get(i) + "-" + offsets.get(i));
}
}
@KafkaListener(id = "second-listener", topics = "topic2", containerFactory = "kafkaListenerContaierTopic2Factory" , autoStartup="false" )
public void receiveRel(@Payload List<String> messages,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
for (int i = 0; i < messages.size(); i++) {
LOG.info("received second='{}' with partition-offset='{}'",
messages.get(i), partitions.get(i) + "-" + offsets.get(i));
}
}
@EventListener()
public void eventHandler(ListenerContainerIdleEvent event) {
LOG.info("Inside event");
this.registry.getListenerContainer("second-listener").start();
}
Я также должен иметь возможность управлять этими темами, когда у меня запущено несколько экземпляров приложения, например, при развертывании этого кода в OpenShift я могу управлять ими в модулях.