Kafka Streams: процессор иногда обрабатывает одни и те же сообщения при перезапуске приложения - PullRequest
1 голос
/ 22 апреля 2019

В моем приложении Java у меня есть процессор Kafka.

Мой метод процесса выглядит так:

@Override
public void process(String key, String value) {
    System.out.println("In the process method, the offset is: " + context.offset());
    //Some more code
}

где контекст - это ProcessorContext из метода init.

Я запускаю приложение, и оно регистрируется:

In the process method, the offset is: 1203
In the process method, the offset is: 1204

Затем я снова запускаю приложение и получаю те же сообщения. После нескольких перезапусков приложения (или через некоторое время я не могу найти шаблон) метод процесса перестает вызываться, и я больше не получаю эти сообщения при запуске приложения.

Есть идеи, почему эти сообщения иногда обрабатываются несколько раз?

My Streams Config имеет следующие свойства:

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someId");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 10);
props.put(StreamsConfig.STATE_DIR_CONFIG, "somedir");

EDIT

Фрагмент кода ниже показывает, как я создаю KafkaStreams:

public class KafkaStreamsProcessorBean implements SmartLifecycle {
    @Override
    public synchronized void start() {
        final KStreamBuilder builder = new KStreamBuilder();

        final KStream<String, String> debeziumStream = builder.stream("debezium.topic");
        debeziumStream.process(() -> debeziumProcessor);

        kafkaStreams = new KafkaStreams(builder, streamsConfig);
        kafkaStreams.start();
    }
}

Здесь streamsConfig - это конфигурация со свойствами, которые я показал, а debeziumProcessor - это процессор Kafka из первого фрагмента кода.

1 Ответ

4 голосов
/ 22 апреля 2019

По умолчанию гарантия обработки Kafka Streams составляет Как минимум один раз .Это означает, что сообщения могут быть обработаны повторно.

В вашем случае, даже если вы установите StreamsConfig.PROCESSING_GUARANTEE_CONFIG на StreamsConfig.EXACTLY_ONCE, вы можете увидеть те же журналы (с той же информацией о смещении) после перезапуска.

Обработкагарантия касается записи смещения и результата в тему в одной транзакции.Это не означает, что сообщение не может быть process несколько раз (вызовите Processor :: process (...) несколько раз с тем же ключом и значением).

Возможен следующий сценарий:

  • Сообщение было прочитано.
  • Processor::process(...) вызвано.
  • Приложение завершено без записи смещения.
  • После перезапуска приложение прочитаетТо же сообщение и Processor::process(...) для того же ключа и значения будут называться
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...