Странное поведение Spring + Flux.У меня есть код сервера Python (использующий Flask, но это не важно, рассматривайте его как псевдокод), который представляет собой потоковый ответ:
def generate():
for row in range(0,10):
time.sleep(1)
yield json.dumps({"count": row}) + '\n'
return Response(generate(), mimetype='application/json')
При этом я имитирую обработку некоторых задач из списка и отправляю мне результатыкак только они будут готовы, вместо того, чтобы ждать, пока все будет сделано, в основном, чтобы не хранить все это в памяти сначала сервера, а затем клиента.Теперь я хочу использовать это с Spring WebClient:
Flux<Count> alerts = webClient
.post()
.uri("/testStream")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToFlux( Count.class )
.log();
alerts.subscribe(a -> log.debug("Received count: " + a.count));
Mono<Void> mono = Mono.when(alerts);
mono.block();
log.debug("All done in method");
Вот что я получаю в журнале:
2019-07-03 18:45:08.330 DEBUG 16256 --- [ctor-http-nio-4] c.k.c.restapi.rest.Controller : Received count: 8
2019-07-03 18:45:09.323 INFO 16256 --- [ctor-http-nio-2] reactor.Flux.MonoFlatMapMany.4 : onNext(com.ksftech.chainfacts.restapi.rest.Controller$Count@55d09f83)
2019-07-03 18:45:09.324 INFO 16256 --- [ctor-http-nio-2] reactor.Flux.MonoFlatMapMany.4 : onComplete()
2019-07-03 18:45:09.325 DEBUG 16256 --- [io-28088-exec-4] c.k.c.restapi.rest.Controller : All done in method
2019-07-03 18:45:09.331 INFO 16256 --- [ctor-http-nio-4] reactor.Flux.MonoFlatMapMany.4 : onNext(com.ksftech.chainfacts.restapi.rest.Controller$Count@da447dd)
2019-07-03 18:45:09.332 DEBUG 16256 --- [ctor-http-nio-4] c.k.c.restapi.rest.Controller : Received count: 9
2019-07-03 18:45:09.333 INFO 16256 --- [ctor-http-nio-4] reactor.Flux.MonoFlatMapMany.4 : onComplete()
Обратите внимание, как последний объект обрабатывается подпиской после mono.blockвозвращается.Я понимаю, что Reactor является асинхронным, и как только он не видит больше объектов, он освобождает Mono и вызывает мой код в подписке параллельно.Тогда планировщик - это милость посмотреть, что запускается первым.
Я придумала довольно уродливый клочок подписки с completeConsumer и использования старого доброго ожидания / уведомления.Тогда все работает нормально.Но есть ли более элегантный способ убедиться, что мой метод ожидает обработки всех элементов Flux?