Смещение прыжков Kafka Consumer API - PullRequest
1 голос
/ 10 мая 2019

Я использую Kafka версии 2.0 и API-интерфейс пользователя java для получения сообщений из темы. Мы используем сервер Kafka с одним узлом и одним потребителем на раздел. Я заметил, что потребитель теряет некоторые сообщения. Сценарий таков: Потребительские опросы темы. Я создал One Consumer Per Thread. Выбирает сообщения и передает их обработчику для обработки сообщения. Затем он фиксирует смещения, используя семантику Kafka Consumer «как минимум один раз» для фиксации смещения Kafka. Параллельно у меня работает другой потребитель с другим идентификатором группы. В этом потребителе я просто увеличиваю счетчик сообщений и фиксирую смещение. У этого потребителя нет потери сообщений.

try {
    //kafkaConsumer.registerTopic();

    consumerThread = new Thread(() -> {
        final String topicName1 = "topic-0";
        final String topicName2 = "topic-1";
        final String topicName3 = "topic-2";
        final String topicName4 = "topic-3";

        String groupId = "group-0";
        final Properties consumerProperties = new Properties();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.13.49:9092");
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        consumerProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

        try {
            consumer = new KafkaConsumer<>(consumerProperties);
            consumer.subscribe(Arrays.asList(topicName1, topicName2, topicName3, topicName4));
        } catch (KafkaException ke) {
            logTrace(MODULE, ke);
        }
        while (service.isServiceStateRunning()) {
            ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, byte[]>> partitionRecords = records.records(partition);
                for (ConsumerRecord<String, byte[]> record : partitionRecords) {
                    processMessage(simpleMessage);

                }
            }
            consumer.commitSync();
        }
        kafkaConsumer.closeResource();
    }, "KAKFA_CONSUMER");

} catch (Exception e) {
}

1 Ответ

2 голосов
/ 10 мая 2019

Кажется, здесь проблема с использованием subscribe ().

Подписаться используется для подписки на темы, а не на разделы. Чтобы использовать определенные разделы, вам нужно использовать assign (). Прочитайте выписку из документации:

public void подписка (java.util.Collection themes)

Подпишитесь на данный список тем, чтобы получить динамическое назначение перегородки. Тематические подписки не являются инкрементными. Этот список будет заменить текущее назначение (если оно есть). Это невозможно объединить тему подписки с групповым управлением с ручным назначение разделов через assign (сборник). Если данный список Темы пустые, обрабатываются так же, как и отписаться (). Это сокращение для подписки (Collection, ConsumerRebalanceListener), которое использует noop слушатель. Если вам нужна способность искать конкретные смещения, вы должны предпочесть подписку (коллекция, ConsumerRebalanceListener), так как групповые перебалансировки вызовут смещения раздела для сброса. Вы также должны предоставить свой собственный слушатель, если вы делаете свое собственное управление смещением, так как слушатель дает вам возможность совершать смещения до восстановления баланса отделка.


public void assign (разделы java.util.Collection)

Вручную назначить список разделов этому потребителю. Этот интерфейс не допускает добавочного присваивания и заменит предыдущее назначение (если оно есть). Если дан список тем разделы пустые, обрабатываются так же, как и отписаться (). Руководство назначение темы с помощью этого метода не использует группу потребителей функциональность управления. Таким образом, не будет никакого перебалансирования операция срабатывает, когда членство в группе или кластер и тема изменение метаданных. Обратите внимание, что невозможно использовать оба руководства назначение раздела с назначением (сборником) и назначение группы с подписка (Collection, ConsumerRebalanceListener).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...