Я сделал слушатель, который слушает тему Kafka с String в качестве ключей и универсальным Avro-class V в качестве сообщения, как показано ниже. Это прекрасно работает и запускает MessageListener всякий раз, когда сообщение отправляется в тему.
String topic = "foo-topic" //from constructor arguments.
ConsumerFactory<String, V> consumerFactory = new DefaultKafkaConsumerFactory<>(
consumerConfig(), getKeyDeserializer(), getValueDeserializer() //methods I've defined.
);
ConcurrentMessageListenerContainer<String, V> container = new ConcurrentMessageListenerContainer<>(
consumerFactory, new ContainerProperties(topic)
);
container.setupMessageListener((MessageListener<String, V>) record -> {
// Do stuff...
});
container.start();
Теперь по вопросу. Мне нужно реализовать метод проверки работоспособности, который проверяет подключение к Кафке. Как мне сделать это для ConcurrentMessageListenerContainer? Есть ли способ, который я могу использовать, который не работает, если он не может подключиться к Кафке? Я работаю над общим вспомогательным классом и хотел бы какой-то общий способ периодически проверять соединение с Кафкой.
Edit:
После некоторых экспериментов я придумал этот метод. Он возвращает true, если тема найдена, и выдает исключение, если не может подключиться к Kafka. Это хорошо идти, что-нибудь, что я пропускаю?
private boolean checkHealth() throws InterruptedException, ExecutionException, TimeoutException {
Properties props = new Properties();
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP);
try (AdminClient admin = AdminClient.create(props)) {
return admin.listTopics().names().get(10, TimeUnit.SECONDS).stream().anyMatch(t -> t.equals(topic));
}
}