Как я могу перечислить подписанных / зарегистрированных потребителей, используя Spring Kafka? - PullRequest
0 голосов
/ 01 ноября 2019

Я Spring-kafka версии 2.2.7-RELEASE, чтобы написать слушателей kafka (как потребителей), и пытаюсь понять, есть ли у нас возможность в spring-kafka напрямую перечислить всех подписанных / зарегистрированных потребителей с такими деталями, какгруппа потребителей, компенсация отставания потребителей и т. д.

1 Ответ

0 голосов
/ 01 ноября 2019

Текущая версия 2.2.x: 2.2.10.

Если вы используете Spring Boot:

@Bean
public ApplicationRunner runner(KafkaAdmin admin, ConsumerFactory<String, String> cf) {
    return args -> {
        Consumer<String, String> consumer = cf.createConsumer("group", "clientId", "");
        try (AdminClient client = AdminClient.create(admin.getConfig())) {
            Collection<ConsumerGroupListing> groups = client.listConsumerGroups()
                    .all()
                    .get(10, TimeUnit.SECONDS);
            groups.forEach(group -> {
                Map<TopicPartition, OffsetAndMetadata> map = null;
                try {
                    map = client.listConsumerGroupOffsets(group.groupId())
                            .partitionsToOffsetAndMetadata()
                            .get(10, TimeUnit.SECONDS);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                catch (ExecutionException e) {
                    e.printStackTrace();
                }
                catch (TimeoutException e) {
                    e.printStackTrace();
                }
                Map<TopicPartition, Long> endOffsets = consumer.endOffsets(map.keySet());
                map.forEach((tp, off) -> {
                    System.out.println("group: " + group + " tp: " + tp
                            + " current offset: " + off.offset()
                            + " end offset: " + endOffsets.get(tp));
                });
            });
        }
        finally {
            consumer.close();
        }
    };
}

Если вы не используете Spring Boot, используйте AdminClient.create() для созданияадминистратор, создать потребителя и сделать то же самое.

...