как получить лаг Кафка с помощью Java - PullRequest
0 голосов
/ 19 октября 2018

В настоящее время я разработал код, который будет отображать тему, раздел и смещение журнала.Но в настоящее время я застрял на том, как получить лаг раздела.Я знаю, что есть команда смещения kafka, которая выполняет эту функцию, но мне нужен Java-код.

public static void main(String[] args) throws Exception {
    System.out.println("START CONSUMER");final Properties props = new Properties();
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    // Create the consumer using props.
    final Consumer<Long, String> consumer =  new KafkaConsumer<>(props);

    // Subscribe to the topic.
    int i = 0;
    ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
    for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
    {
        TopicPartition partitiontemp = new TopicPartition(TOPIC, i);
        partitions.add(partitiontemp);
    }
    consumer.assign(partitions);
    consumer.seekToEnd(partitions);

    for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
    {
        System.out.printf("Topic: %s partitionID: %d log offset: %d \n", TOPIC, i, consumer.position(partitions.get(i)));
    }

    System.out.printf("CREATE CONSUMER DONE");
    consumer.close();

This is the output of my code

Что мне нужно сделать, это вывести тему, раздел, текущее смещение, смещение журнала и задержку.Как я могу получить задержку для моего кода или как я могу получить текущее смещение для моего кода.(см. изображение для необходимого вывода).

Needed output

ПРИМЕЧАНИЕ: я не могу использовать функциональность (запись foreach), потому что я не должен читать каждую запись во входном файле.

1 Ответ

0 голосов
/ 20 октября 2018

Для воспроизведения функциональности kafka-consumer-groups необходим экземпляр Consumer и AdminClient.

Сначала, используя AdminClient, вы можете вызвать listConsumerGroupOffsets(), чтобы получить списокразделы-разделы и зафиксированные смещения для определенной группы.

Затем используйте получателя, чтобы получить смещения для этих разделов.Метод, который вы использовали, неэффективен, нет необходимости назначать и искать конечное смещение.Вы можете просто позвонить endOffsets().

Этого достаточно для воспроизведения данных, содержащихся на скриншоте.

kafka-consumer-groups также использует AdminClient.describeConsumerGroups() чтобы напечатать члена группы, назначенного (если есть) каждому разделу.

...