Flux.subscribe заканчивается до последнего элемента в обработке - PullRequest
0 голосов
/ 03 июля 2019

Странное поведение Spring + Flux.У меня есть код сервера Python (использующий Flask, но это не важно, рассматривайте его как псевдокод), который представляет собой потоковый ответ:

def generate():
    for row in range(0,10):
        time.sleep(1)
        yield json.dumps({"count": row}) + '\n'
return Response(generate(), mimetype='application/json')

При этом я имитирую обработку некоторых задач из списка и отправляю мне результатыкак только они будут готовы, вместо того, чтобы ждать, пока все будет сделано, в основном, чтобы не хранить все это в памяти сначала сервера, а затем клиента.Теперь я хочу использовать это с Spring WebClient:

Flux<Count> alerts = webClient
        .post()
        .uri("/testStream")
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToFlux( Count.class )
        .log();
alerts.subscribe(a -> log.debug("Received count: " + a.count));
Mono<Void> mono = Mono.when(alerts);
mono.block();
log.debug("All done in method");

Вот что я получаю в журнале:

2019-07-03 18:45:08.330 DEBUG 16256 --- [ctor-http-nio-4] c.k.c.restapi.rest.Controller     : Received count: 8

2019-07-03 18:45:09.323  INFO 16256 --- [ctor-http-nio-2] reactor.Flux.MonoFlatMapMany.4           : onNext(com.ksftech.chainfacts.restapi.rest.Controller$Count@55d09f83)

2019-07-03 18:45:09.324  INFO 16256 --- [ctor-http-nio-2] reactor.Flux.MonoFlatMapMany.4           : onComplete()

2019-07-03 18:45:09.325 DEBUG 16256 --- [io-28088-exec-4] c.k.c.restapi.rest.Controller     : All done in method

2019-07-03 18:45:09.331  INFO 16256 --- [ctor-http-nio-4] reactor.Flux.MonoFlatMapMany.4           : onNext(com.ksftech.chainfacts.restapi.rest.Controller$Count@da447dd)

2019-07-03 18:45:09.332 DEBUG 16256 --- [ctor-http-nio-4] c.k.c.restapi.rest.Controller     : Received count: 9
2019-07-03 18:45:09.333  INFO 16256 --- [ctor-http-nio-4] reactor.Flux.MonoFlatMapMany.4           : onComplete()

Обратите внимание, как последний объект обрабатывается подпиской после mono.blockвозвращается.Я понимаю, что Reactor является асинхронным, и как только он не видит больше объектов, он освобождает Mono и вызывает мой код в подписке параллельно.Тогда планировщик - это милость посмотреть, что запускается первым.

Я придумала довольно уродливый клочок подписки с completeConsumer и использования старого доброго ожидания / уведомления.Тогда все работает нормально.Но есть ли более элегантный способ убедиться, что мой метод ожидает обработки всех элементов Flux?

1 Ответ

0 голосов
/ 05 июля 2019

ОК, я изучил эту область и понял, что Reactor предназначен для асинхронного выполнения. Если мне это нужно синхронно, я должен использовать синхронизацию. И чтобы иметь код, который выполняется после того, как все было подано для подписки, мне нужно использовать doOnComplete:

public class FluxResult {
  public boolean success = true;
  public Exception ex = null;
  public void error() {success = false;}
  public void error(Exception e) {success = false; ex = e;}

  public synchronized void waitForFluxCompletion() throws InterruptedException {
    wait();
  }

  public synchronized void notifyAboutFluxCompletion() {
    notify();
  }
}

.... // do something which returns Flux
myflux
          .doFirst(() -> {
             // initialization
          })
          .doOnError(e -> {
            log.error("Exception", e);
          })
          .doOnComplete(() -> {
            try {
              // finalization. If we were accumulating objects, now flush them
            }
            catch (Exception e) {
              log.error("Exception", e);
              flux_res.error(e);
            }
            finally {
              flux_res.notifyAboutFluxCompletion();
            }
          })
          .subscribe(str -> {
            // something which must be executed for each item
          });

А затем дождитесь сигнала объекта:

  flux_res.waitForFluxCompletion();
  if (!flux_res.success) {
    if (flux_res.ex != null) {
...