Как получить идентификатор потребителя с помощью потребителя @KafkaListener Spring Boot 2 - PullRequest
2 голосов
/ 05 мая 2020

Consumers listed using bin/kafka-consumer-groups.sh utility

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer

Возможно ли получить consumer-id информацию, как в выводе команды выше, в Spring boot @KafkaListener consumer?
Я хочу добавить это идентификатор потребителя в таблицу, чтобы представить процессор, который обрабатывал данные.
Я прошел через ответ @ gary-russell на Как получить идентификатор потребителя kafka для ведения журнала , но я не вижу идентификатор потребителя появляются в журналах, назначенных разделам.

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer.info - my-consumer: partitions assigned: [test_topic-7, test_topic-6, test_topic-5, test_topic-4]
[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer.info - my-consumer: partitions assigned: [test_topic-3, test_topic-2, test_topic-1, test_topic-0]

Я использую Spring boot 2.2.2
зависимости: spring-kafka

Ответы [ 2 ]

1 голос
/ 05 мая 2020

Я не думаю, что consumer-id доступен на клиенте; вы можете получить client-id s из metrics:

@SpringBootApplication
public class So61616543Application {

    public static void main(String[] args) {
        SpringApplication.run(So61616543Application.class, args).close();
    }


    @KafkaListener(id = "so61616543", topics = "so61616543")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so61616543").partitions(1).replicas(1).build();
    }


    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
        return args -> {
            registry.getListenerContainer("so61616543").metrics()
                .forEach((clientId, metrics) -> {
                    System.out.println(clientId);
                    metrics.forEach((key, value) -> System.out.println(key + ":" + value));
                });
        };
    }

}
1 голос
/ 05 мая 2020

Вы можете использовать следующую цепочку методов, используя autowired AdminClient:

public Collection<MemberDescription> members() 
    throws InterruptedException, ExecutionException, TimeoutException 
{
  String group = "my-consumer";
  List<String> groups = Collections.singletonList(group);

  return adminClient
    .describeConsumerGroups(groups) // DescribeConsumerGroupsResult
    .describedGroups()              // Map<String, KafkaFuture<ConsumerGroupDescription>>
    .get(group)                     // KafkaFuture<ConsumerGroupDescription>
    .get(2, TimeUnit.SECONDS)       // ConsumerGroupDescription
    .members();                     // Collection<MemberDescription>
}

Возвращенная коллекция MemberDescription содержит ваш потребители. Объект предоставляет следующие «геттеры»:

Причина обертывания KafkaFuture состоит в том, чтобы поддерживать стиль асинхронного программирования способом Java -8, реализующим java.util.concurrent.Future<T>. Вы можете либо заблокировать вызов с надеждой на результат (должен быть доступен к тому времени) с помощью (.get(2, TimeUnit.SECONDS)), либо использовать правильный асинхронный способ через Java -8 Futures.

...