Я использую функциональные интерфейсы Spring Cloud Webflux и Spring Cloud Streams для обработки моей кафки.
Если я выполняю обработку последовательно и если я убиваю приложение, он возвращает сообщение для обработки, которая работает как и ожидалось, так как нет пропадания сообщений. Однако, если я пытаюсь сделать параллельное, это кажется подтверждением для Кафки, что понятно, поскольку теперь это отдельный поток, поэтому я хочу обратиться к ручному подтверждению.
Мой код:
- application.yml (соответствующие части)
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
autoAddPartitions: true
minPartitionCount: 2
bindings:
receiver-in-0:
binder: kafka
destination: topic-1
content-type: text/plain;charset=UTF-8
group: input-group-1
consumer:
autoCommitOffset: false
spring.cloud.stream.function.definition: receiver
Код получателя
public Consumer<Flux<Message<String>>> receiver() throws IOException {
return (sink -> {
sink
.onBackpressureBuffer()
.parallel(4)
.runOn(Schedulers.parallel())
.subscribe((record)->{
Flux<Action> executor = new
//Internal code which does transformation and provides a flux for execution (names changed)
IncomingMessage().process(record);
if(executor != null) {
Disposable disposable=null;
disposable= executor.subscribe(
(action)->{
try {
//Process execute does the processing on the modified data (names changed)
Process.execute(action);
Acknowledgment acknowledgment = record.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if(acknowledgment !=null) {
acknowledgment.acknowledge();
}
}
catch(Exception e) {
log.fatal(e.getMsg());
}
},
(e)->{
log.fatal(e.getMsg());
}
});
if(disposable != null) {
disposable.dispose();
}
}
});
});
}
Здесь строка record.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
всегда дает ноль, поэтому я предполагаю, что autoCommitOffset: false
не работает, я попытался поместить приведенную ниже конфигурацию также в привязку раздел, но безрезультатно.
receiver-in-0:
binder: kafka
destination: topic-1
content-type: text/plain;charset=UTF-8
group: input-group-1
autoCommitOffset: false
Мое требование состоит в том, что, если я убью приложение, даже в параллельном сценарии оно должно продолжать читать сообщения из первого неподтвержденного сообщения.