Проверка здоровья потребителей Kafka - PullRequest
0 голосов
/ 14 мая 2019

Есть ли простой способ сказать, работает ли потребитель (созданный с помощью весенней загрузки и @KafkaListener) нормально? Это включает - может получить доступ и опросить брокера, назначить хотя бы один раздел и т. Д.

Я вижу, что есть способы подписаться на различные события жизненного цикла, но это, кажется, очень хрупкое решение.

Заранее спасибо!

Ответы [ 2 ]

1 голос
/ 14 мая 2019

Вы можете использовать AdminClient, чтобы получить текущий статус группы ...

@SpringBootApplication
public class So56134056Application {

    public static void main(String[] args) {
        SpringApplication.run(So56134056Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so56134056", 1, (short) 1);
    }

    @KafkaListener(id = "so56134056", topics = "so56134056")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(KafkaAdmin admin) {
        return args -> {
            try (AdminClient client = AdminClient.create(admin.getConfig())) {
                while (true) {
                    Map<String, ConsumerGroupDescription> map =
                            client.describeConsumerGroups(Collections.singletonList("so56134056")).all().get();
                    System.out.println(map);
                    System.in.read();
                }
            }
        };
    }

}

{so56134056=(groupId=so56134056, isSimpleConsumerGroup=false, members=(memberId=consumer-2-32a80e0a-2b8d-4519-b71d-671117e7eaf8, clientId=consumer-2, host=/127.0.0.1, assignment=(topicPartitions=so56134056-0)), partitionAssignor=range, state=Stable, coordinator=localhost:9092 (id: 0 rack: null))}

Мы думали о разоблачении getLastPollTime() для API контейнера слушателя.

getAssignedPartitions() доступен с 2.1.3.

0 голосов
/ 21 мая 2019

Я знаю, что вы не упомянули об этом в своем посте, но остерегайтесь добавления таких элементов в проверку работоспособности, если вы затем развернете в AWS и используете такую ​​проверку работоспособности для своей среды масштабирования ELB.

Например, один из сценариев, который может произойти, это то, что ваше приложение теряет связь с Kafka - ваша проверка работоспособности становится КРАСНОЙ - и затем эластичные beanstalks начинают процесс уничтожения и повторного запуска ваших экземпляров (что будет происходить непрерывно, пока ваши экземпляры Kafka снова не станут доступны).Это может быть дорогостоящим!

Существует также более общий философский вопрос о том, должны ли проверки работоспособности «каскадных сбоев» или нет, например, kafka не работает, поэтому приложение, подключенное к kafka, утверждает, что оно не работает, следующее приложение в цепочкетакже делает то же самое и т.д.

...