Конечная точка потока из бесконечного потока Java - PullRequest
0 голосов
/ 17 января 2019

У меня проблема при обработке потока, который построен из конструкции Stream.generate.

Поток Java извлекает некоторые данные из удаленного источника, поэтому я реализовал собственного поставщика, в который встроена логика выборки данных, а затем использовал его для заполнения потока.

Stream.generate(new SearchSupplier(...))

Моя идея состоит в том, чтобы обнаружить пустой список и использовать функцию Java9 takeWhile ->

Stream.generate(new SearchSupplier(this, queryBody))
            .takeWhile(either -> either.isRight() && either.get().nonEmpty())

(с использованием конструкции Vavr Either)

Флюс слоя репозитория будет тогда делать:

return Flux.fromStream (
            this.searchStream(...) //this is where the stream gets generated
        )
        .map(Either::get)
        .flatMap(Flux::fromIterable);

Слой "service" состоит из нескольких шагов преобразования потока, но сигнатура метода выглядит как Flux<JsonObject> search(...).

Наконец, уровень контроллера имеет GetMapping:

@GetMapping(produces = "application/stream+json")
public Flux search(...) {
    return searchService.search(...) //this is the Flux<JsonObject> parth
         .subscriberContext(...) //stuff I need available during processing
         .doOnComplete(() -> log.debug("DONE")); 
}

Моя проблема в том, что поток, кажется, никогда не прекращается. Например, позвонив Почтальону, вы сняли часть «Загрузка ...» в разделе ответов. Когда я прекращаю процесс из моей IDE, результаты отправляются почтальону, и я вижу то, что ожидаю. Также лямбда doOnComplete никогда не будет вызываться

Что я заметил, так это то, что если я меняю источник потока:

Flux.fromArray(...) //harcoded array of lists of jsons

вызывается лямбда doOnComplete, а также закрывается http-соединение, и результаты отображаются в почтальоне.

Есть идеи, в чем может быть проблема?

Спасибо.

1 Ответ

0 голосов
/ 24 января 2019

Вы можете создать Flux напрямую, используя код, который выглядит следующим образом. Обратите внимание, что я добавляю несколько предполагаемых методов, которые вам нужно будет реализовать в зависимости от того, как работает ваш SearchSupplier:

Flux<SearchResultType> flux = Flux.generate(
            () -> new SearchSupplier(this, queryBody),
            (supplier, sink) -> {
                SearchResultType current = supplier.next();
                if (isNotLast(current)) {
                    sink.next(current);
                } else {
                    sink.complete();
                }
                return supplier;
            },
            supplier -> anyCleanupOperations(supplier)
        );
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...