Я работаю над приложением, которое использует project-reactor
API-интерфейсы Kafka для реактивного подключения к Kafka-brokers
.Вариант использования состоит в том, что существует входная тема, которая содержит пути к файлам для обработки.Приложение считывает каждый файл, обрабатывает его, создает поток обработанных сообщений и помещает его в тему вывода.Требование заключается в том, что файл должен быть удален после того, как он был обработан, а обработанные сообщения должны быть перенесены в тему вывода.Таким образом, действие удаления должно быть выполнено после обработки каждого файла и передачи сообщения в тему вывода.
public Flux<?> flux() {
return KafkaReceiver
.create(receiverOptions(Collections.singleton(sourceTopic)))
.receive()
.flatMap(m -> transform(m.value()).map(x -> SenderRecord.create(x,
m.receiverOffset())))
.as(sender::send)
.doOnNext(m -> {
m.correlationMetadata().acknowledge();
deleteFile(path);
}).doOnCancel(() -> close());
}
* Метод transform () инициируетОбработка файла в пути к файлу (m.value ()) и возвращает поток сообщений.
Проблема заключается в том, что файл удаляется даже до того, как все сообщения будут отправлены в тему вывода.Поэтому в случае сбоя при повторной попытке исходный файл недоступен.