Кафка: KafkaConsumer не может вытащить все записи - PullRequest
0 голосов
/ 05 июля 2019

Я довольно новый в Кафке.

Для проведения стресс-тестирования моего кластера и создания опыта эксплуатации я создал два простых приложения Java: одно, которое многократно публикует сообщения в теме (последовательность целых чисел), и другое приложение, которое загружает всю тему (все записи) и проверяет, что последовательность завершена.Ожидается, что сообщения не будут потеряны из-за операций в кластере (перезапуск узла, замена узла, перенастройка разделов разделов и т. Д.).

Раздел «последовательность» состоит из двух разделов и коэффициента репликации 3.кластер состоит из 3-х виртуальных узлов (это для целей тестирования, следовательно, они работают на одной машине).Тема настроена на сохранение всех сообщений (retention.ms установлено на -1)

В настоящее время у меня есть две проблемы, которые мне трудно выяснить:

  1. ЕслиЯ использую bin/kafka-console-consumer.sh --bootstrap-server kafka-test-server:9090,kafka-test-server:9091,kafka-test-server:9092 --topic sequence --from-beginning Я вижу ВСЕ сообщения (даже не упорядоченные, как ожидалось), загруженные на консоль.С другой стороны, если я использую написанное мной приложение-потребитель, я вижу разные результаты, загружаемые в каждом цикле: https://i.stack.imgur.com/tMK10.png - В выводе консоли первая строка после делителя - это вызов records.partitions()следовательно, записи только иногда извлекаются из обоих разделов.Почему и почему java-приложение не ведет себя как bin/kafka-console-consumer.sh?

  2. Когда тема становится большой, bin/kafka-console-consumer.sh по-прежнему может показывать все сообщения, в то время как приложение можетзагрузить только около 18'000 сообщений.Я попытался поиграть с различными конфигурациями на стороне потребителя, но без прогресса.Опять же, вопрос в том, почему есть разница?

Заранее благодарен за любую подсказку!

Вот для справки.Обсуждались два приложения:

package ch.demo.toys;

import java.io.FileInputStream;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class SequenceProducer {
    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();

        properties.load(new FileInputStream("toy.properties"));

        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("acks", "1");
        properties.put("retries", "3");
        properties.put("compression.type", "snappy");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1);

        for (Integer sequence_i = 0; true; sequence_i++) {
            try(Producer<Integer, String> producer = new KafkaProducer<>(properties)) {

                ProducerRecord<Integer, String> record = new ProducerRecord<>("sequence", sequence_i, "Sequence number: " + String.valueOf(sequence_i));

                Future<RecordMetadata> sendFuture = producer.send(record, (metadata, exception) -> {

                    System.out.println("Adding " + record.key() + " to partition " + metadata.partition());

                    if (exception != null) {
                        exception.printStackTrace();
                    }
                });
            }

            Thread.sleep(200);
        }
    }
}
package ch.demo.toys;

import java.io.FileInputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

public class CarthusianConsumer {
    private static Properties getProperties() throws Exception {
        Properties properties = new Properties();
        properties.load(new FileInputStream("toy.properties"));

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,    org.apache.kafka.common.serialization.IntegerDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,  org.apache.kafka.common.serialization.StringDeserializer.class);
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,          Integer.MAX_VALUE);
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,        60 * 1000);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,                  "carthusian-consumer");
        properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,         60 * 1000);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,        false);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,         "earliest");
        properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,           1024 * 1024 * 1024);

        return properties;
    }

    private static boolean checkConsistency(List<Integer> sequence) {
        Collections.sort(sequence);

        Iterator<Integer> iterator = sequence.iterator();

        int control = 0;
        while(iterator.hasNext()) {
            int value = iterator.next();
            if (value != control) {
                System.out.println("");
                System.out.println("Gap found:");
                System.out.println("\tSequence: " + value);
                System.out.println("\tControl: " + control);
                return false;
            }

            control++;
        }

        System.out.print(".");
        return true;
    }

    public static void main(String[] args) throws Exception {

        // Step 1: create a base consumer object
        Consumer<Integer, String> consumer = new KafkaConsumer<>(getProperties());

        // Step 2: load topic configuration and build list of TopicPartitons
        List<TopicPartition> topicPartitions = consumer
                .partitionsFor("sequence")
                .stream()
                .parallel()
                .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
                .collect(Collectors.toList());


        while (true) {
            List<Integer> sequence = new ArrayList<>();

            for (TopicPartition topicPartition : topicPartitions) {

                // Step 3. specify the topic-partition to "read" from
                // System.out.println("Partition specified: " + topicPartition);
                consumer.assign(Arrays.asList(topicPartition));

                // Step 4. set offset at the beginning
                consumer.seekToBeginning(Arrays.asList(topicPartition));

                // Step 5. get all records from topic-partition
                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));

                // System.out.println("\tCount: " + records.count());
                // System.out.println("\tPartitions: " + records.partitions());

                records.forEach(record -> { sequence.add(record.key()); });
            }


            System.out.println(sequence.size());

            checkConsistency(sequence);

            Thread.sleep(2500);
        }
    }
}

Ответы [ 2 ]

0 голосов
/ 05 июля 2019

Спасибо, Микаэль-Мезон, вот мой ответ:

О продюсере: спасибо за комментарий.Я признаю, что взял пример из книги и изменил его напрямую, без учета производительности.

Для потребителя: как упоминалось в комментариях выше, подписка была первой попыткой, которая, к сожалению, дала тот же результат, который описан в моем вопросе: результатыиз отдельных разделов, и только в редких случаях из обоих разделов в одном вызове.Я также хотел бы понять причины этого, по-видимому, случайного поведения!

Еще о потребителе: я перематываю к началу темы в каждом цикле, потому что цель состоит в том, чтобы постоянно проверять, чтобы последовательность не нарушалась(следовательно, сообщения не теряются).На каждом цикле я загружаю все сообщения и проверяю их.

Поскольку один вызов на основе подписки на тему приводил к случайному поведению (не уверен, когда возвращается полное содержание темы);Мне приходилось считывать данные с каждого отдельного раздела и вручную присоединяться к спискам записей, прежде чем проверять их - это не то, что я хотел сделать изначально!

Не верны ли мои подходы?

0 голосов
/ 05 июля 2019

Есть несколько вещей, которые вы должны изменить в логике своих клиентов.

Производитель:

Вы создаете нового продюсера для каждой отправляемой вами записи. Это ужасно с точки зрения производительности, так как каждый продюсер запускает первую загрузку перед отправкой записи. Также, поскольку каждый производитель отправляет одну запись, пакетирование не происходит. Наконец, сжатие на одной записи также не существует.

Сначала вы должны создать Producer и использовать его для отправки всех записей, т.е. вывести создание из цикла, что-то вроде:

try (Producer<Integer, String> producer = new KafkaProducer<>(properties)) {
    for (int sequence_i = 18310; true; sequence_i++) {

        ProducerRecord<Integer, String> record = new ProducerRecord<>("sequence", sequence_i, "Sequence number: " + String.valueOf(sequence_i));

        producer.send(record, (metadata, exception) -> {

            System.out.println("Adding " + record.key() + " to partition " + metadata.partition());

            if (exception != null) {
                exception.printStackTrace();
            }
        });
        Thread.sleep(200L);
    }
}

Потребитель:

На каждой итерации цикла for вы изменяете присваивание и возвращаетесь к началу раздела, поэтому в лучшем случае вы будете повторять одни и те же сообщения каждый раз!

Для начала вам, вероятно, следует использовать subscribe() API (например, kafka-console-consumer.sh), чтобы вам не приходилось возиться с разделами. Например:

try (Consumer<Integer, String> consumer = new KafkaConsumer<>(properties)) {

    consumer.subscribe(Collections.singletonList("topic"));

    while (true) {
        List<Integer> sequence = new ArrayList<>();

        ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1L));
        records.forEach(record -> {
            sequence.add(record.key());
        });

        System.out.println(sequence.size());
        checkConsistency(sequence);

        Thread.sleep(2500L);
    }
}
...