Как я могу проверить соединение слушателей Кафки с Кафкой? - PullRequest
0 голосов
/ 08 мая 2019

Я сделал слушатель, который слушает тему Kafka с String в качестве ключей и универсальным Avro-class V в качестве сообщения, как показано ниже. Это прекрасно работает и запускает MessageListener всякий раз, когда сообщение отправляется в тему.

String topic = "foo-topic" //from constructor arguments. 
ConsumerFactory<String, V> consumerFactory = new DefaultKafkaConsumerFactory<>(
        consumerConfig(), getKeyDeserializer(), getValueDeserializer() //methods I've defined. 
);
ConcurrentMessageListenerContainer<String, V> container = new ConcurrentMessageListenerContainer<>(
        consumerFactory, new ContainerProperties(topic)
);
container.setupMessageListener((MessageListener<String, V>) record -> {
    // Do stuff...
});
container.start();

Теперь по вопросу. Мне нужно реализовать метод проверки работоспособности, который проверяет подключение к Кафке. Как мне сделать это для ConcurrentMessageListenerContainer? Есть ли способ, который я могу использовать, который не работает, если он не может подключиться к Кафке? Я работаю над общим вспомогательным классом и хотел бы какой-то общий способ периодически проверять соединение с Кафкой.

Edit: После некоторых экспериментов я придумал этот метод. Он возвращает true, если тема найдена, и выдает исключение, если не может подключиться к Kafka. Это хорошо идти, что-нибудь, что я пропускаю?

private boolean checkHealth() throws InterruptedException, ExecutionException, TimeoutException {
    Properties props = new Properties();
    props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP);
    try (AdminClient admin = AdminClient.create(props)) {
        return admin.listTopics().names().get(10, TimeUnit.SECONDS).stream().anyMatch(t -> t.equals(topic));
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...