В моем приложении Java у меня есть процессор Kafka.
Мой метод процесса выглядит так:
@Override
public void process(String key, String value) {
System.out.println("In the process method, the offset is: " + context.offset());
//Some more code
}
где контекст - это ProcessorContext из метода init.
Я запускаю приложение, и оно регистрируется:
In the process method, the offset is: 1203
In the process method, the offset is: 1204
Затем я снова запускаю приложение и получаю те же сообщения. После нескольких перезапусков приложения (или через некоторое время я не могу найти шаблон) метод процесса перестает вызываться, и я больше не получаю эти сообщения при запуске приложения.
Есть идеи, почему эти сообщения иногда обрабатываются несколько раз?
My Streams Config имеет следующие свойства:
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someId");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 10);
props.put(StreamsConfig.STATE_DIR_CONFIG, "somedir");
EDIT
Фрагмент кода ниже показывает, как я создаю KafkaStreams:
public class KafkaStreamsProcessorBean implements SmartLifecycle {
@Override
public synchronized void start() {
final KStreamBuilder builder = new KStreamBuilder();
final KStream<String, String> debeziumStream = builder.stream("debezium.topic");
debeziumStream.process(() -> debeziumProcessor);
kafkaStreams = new KafkaStreams(builder, streamsConfig);
kafkaStreams.start();
}
}
Здесь streamsConfig - это конфигурация со свойствами, которые я показал, а debeziumProcessor - это процессор Kafka из первого фрагмента кода.