Правильно ли использовать Java Kafka Consumer без операции фиксации? - PullRequest
0 голосов
/ 20 марта 2019

Мне нужно прочитать множество записей от начального смещения до конечного смещения. Я использую для этого преданного потребителя Кафку. Я в порядке с хотя бы раз семантической (в случае, если данный экземпляр приложения выходит из строя, и новый экземпляр приложения повторно считывает записи из этого начального смещения).

Итак, я могу использовать такой код?

private static KafkaConsumer<Long, String> createConsumer() {

    final Properties props = new Properties();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    return new KafkaConsumer<>(props);
}

public void process() {

    KafkaConsumer consumer = createConsumer();
    TopicPartition topicPartition = new TopicPartition("topic", 2);
    consumer.assign(List.of(topicPartition));

    long startOffset = 42;
    long endOffset = 100;

    consumer.seek(topicPartition, startOffset);

    boolean isRunning = true;
    while (isRunning) {
        final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);

        for (ConsumerRecord<Long, String> record : consumerRecords) {
            if (record.offset() >= endOffset) {
                isRunning = false;
                break;
            }
        }
    }

    consumer.close();
}

Итак:

  • У меня нет commit()
  • Я отключаю auto-commit
  • У меня нет group-id

Это правильный код? Или у него есть какие-то скрытые проблемы?

1 Ответ

1 голос
/ 22 марта 2019

Да, это правильное использование, и вы не должны сталкиваться с какими-либо проблемами. Это не типичное использование потребителем Kafka, но это разрешено.

Из официального KafkaConsumer Javadoc (мои основные моменты):

https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

Управление положением потребителя

В большинстве случаев потребитель просто потребляет записи от начала до конца, периодически фиксируя свою позицию (автоматически или вручную). Однако Kafka позволяет потребителю вручную контролировать свою позицию, перемещаясь вперед или назад в разделе по желанию. Это означает, что потребитель может повторно использовать более старые записи или переходить к последним записи без фактического использования промежуточных записей. Есть несколько случаев, когда ручное управление положением потребителя может быть полезным.

...

Кафка позволяет указать позицию, используя поиск (TopicPartition, long), чтобы указать новую позицию . Также доступны специальные методы для поиска самого раннего и самого последнего смещения, поддерживаемого сервером (seekToBeginning (Collection) и seekToEnd (Collection) соответственно).

...