Реактор проекта Кафка: выполнить действие в конце потока без блокировки - PullRequest
0 голосов
/ 02 января 2019

Я работаю над приложением, которое использует project-reactor API-интерфейсы Kafka для реактивного подключения к Kafka-brokers.Вариант использования состоит в том, что существует входная тема, которая содержит пути к файлам для обработки.Приложение считывает каждый файл, обрабатывает его, создает поток обработанных сообщений и помещает его в тему вывода.Требование заключается в том, что файл должен быть удален после того, как он был обработан, а обработанные сообщения должны быть перенесены в тему вывода.Таким образом, действие удаления должно быть выполнено после обработки каждого файла и передачи сообщения в тему вывода.

public Flux<?> flux() {
   return KafkaReceiver
   .create(receiverOptions(Collections.singleton(sourceTopic)))
   .receive()
   .flatMap(m -> transform(m.value()).map(x -> SenderRecord.create(x, 
   m.receiverOffset())))
   .as(sender::send)
   .doOnNext(m -> {
   m.correlationMetadata().acknowledge();
   deleteFile(path);
}).doOnCancel(() -> close());

}

* Метод transform () инициируетОбработка файла в пути к файлу (m.value ()) и возвращает поток сообщений.

Проблема заключается в том, что файл удаляется даже до того, как все сообщения будут отправлены в тему вывода.Поэтому в случае сбоя при повторной попытке исходный файл недоступен.

1 Ответ

0 голосов
/ 03 января 2019

Поскольку кажется, что переменная path доступна во всем конвейере (входной параметр метода?), Вы можете удалить файл в отдельном doFinally.Вам нужно будет отфильтровать по onComplete или cancel SignalType, потому что вы не хотите удалять файл в случае сбоя.

Другой вариант будет doOnComplete, если выне заинтересован в удалении файла при отмене.

...