Как отобразить для каждого раздела в Кафка, а не для записи? - PullRequest
0 голосов
/ 18 октября 2018

В настоящее время я создаю kafka потребитель в Java, где он будет отображать только разделы (у меня есть 10 разделов) и смещения определенных topic и group id.Мой текущий код отображается для каждой записи (или для каждой строки данных) данного ввода.Если бы у меня было 10 разделов и 15 строк данных, было бы отображено 15 строк и несколько экземпляров раздела.

Вот мои настройки для потребителя:

private static Consumer<Long, String> createConsumer() {
    System.out.println("CREATE CONSUMER");
    //Configure consumer settings/properties
    final Properties props = new Properties();
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    // Create the consumer using props.
    final Consumer<Long, String> consumer =  new KafkaConsumer<>(props);

    // Subscribe to the topic.
    consumer.subscribe(Collections.singletonList(TOPIC));
    return consumer;
}

Вот мой код для отображения вывода:

while (noRecordsCount < giveUp)
    {
        final ConsumerRecords<Long, String> consumerRecords = consumer.poll(500);

        if (consumerRecords.count() == 0)
        {
            noRecordsCount++;
            if (noRecordsCount > giveUp) break;
            else continue;
        }          

        //Stores each topic and partition to a specific array list for easier output manipulation
        consumerRecords.forEach(record -> {
            partitionrecord.add(record.partition());
            offsetrecord.add(record.offset());
            System.out.printf("Consumer Record: %s (%d, %d)"+"\n", TOPIC, record.partition(), record.offset()); 

        });

    }

Вывод кода:

Ожидаемый вывод показывает, что экземпляр для раздела не для каждой записи (имя темы, номер раздела, смещение):

Мне нужно отобразить 10 разделов, а не отображать каждую (15) запись и ее конкретную информацию (смещение, раздел, значение и т. Д.).мне нужно добавить какие-либо конкретные команды или функции в моем коде?Я новичок в переполнении стека и прошу прощения, если мой запрос длинный.

1 Ответ

0 голосов
/ 18 октября 2018

Вы всегда получите все записи после последнего смещения, с которого вы начали, и для всех разделов, назначенных экземпляру потребителя.

Если вы запускаете десять потребителей, вы должны ожидать увидеть только один раздел на экземпляр, но все же все смещения.

Нет настройки для получения только одной записи, поскольку ваши производители должны отправлять только N сообщений равномерно после того, как потребитель начнет их ждать.

Также нет гарантии упорядочения между разделами для экземпляра-потребителя, которому назначено более одного раздела


Однако вы можете использовать TreeMap или данные max-heapструктура для хранения ваших точек данных, затем циклически переберите разделы по порядку и выведите максимально смещенные смещения для каждой

Другими словами, вы в настоящее время распечатываете каждую запись, а не печатаете после всех циклов надразделы, поэтому вы получаете первый показанный вывод

Итак, нет способа делать то, что вы хотите, «в Кафке», но проблема, о которой вы спрашиваете, заключается в том, как вы пакетируете записи, когда выПолучаете их, затем сохраняете только максимум, и, наконец, где вы выводите эту информацию.

Примечание: команда GetOffsetShell уже может запрашивать наибольшее текущее смещение для всех разделов

...