У меня проблема при обработке потока, который построен из конструкции Stream.generate
.
Поток Java извлекает некоторые данные из удаленного источника, поэтому я реализовал собственного поставщика, в который встроена логика выборки данных, а затем использовал его для заполнения потока.
Stream.generate(new SearchSupplier(...))
Моя идея состоит в том, чтобы обнаружить пустой список и использовать функцию Java9 takeWhile
->
Stream.generate(new SearchSupplier(this, queryBody))
.takeWhile(either -> either.isRight() && either.get().nonEmpty())
(с использованием конструкции Vavr Either)
Флюс слоя репозитория будет тогда делать:
return Flux.fromStream (
this.searchStream(...) //this is where the stream gets generated
)
.map(Either::get)
.flatMap(Flux::fromIterable);
Слой "service" состоит из нескольких шагов преобразования потока, но сигнатура метода выглядит как Flux<JsonObject> search(...)
.
Наконец, уровень контроллера имеет GetMapping:
@GetMapping(produces = "application/stream+json")
public Flux search(...) {
return searchService.search(...) //this is the Flux<JsonObject> parth
.subscriberContext(...) //stuff I need available during processing
.doOnComplete(() -> log.debug("DONE"));
}
Моя проблема в том, что поток, кажется, никогда не прекращается.
Например, позвонив Почтальону, вы сняли часть «Загрузка ...» в разделе ответов. Когда я прекращаю процесс из моей IDE, результаты отправляются почтальону, и я вижу то, что ожидаю. Также лямбда doOnComplete никогда не будет вызываться
Что я заметил, так это то, что если я меняю источник потока:
Flux.fromArray(...) //harcoded array of lists of jsons
вызывается лямбда doOnComplete, а также закрывается http-соединение, и результаты отображаются в почтальоне.
Есть идеи, в чем может быть проблема?
Спасибо.