На бэкэнде я делаю:
@PostMapping(path = "/products", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public void saveProducts(@Valid @RequestBody Flux<Product> products) {
products.subscribe(product -> log.info("product: " + product.toString()));
}
И на фронте я называю это используя:
this.targetWebClient
.post()
.uri(productUri)
.accept(MediaType.APPLICATION_STREAM_JSON)
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(this.sourceWebClient
.get()
.uri(uriBuilder -> uriBuilder.path(this.sourceEndpoint + "/id")
.queryParam("date", date)
.build())
.accept(MediaType.APPLICATION_STREAM_JSON)
.retrieve()
.bodyToFlux(Product.class), Product.class)
.exchange()
.subscribe();
Что происходит сейчас, так это то, что у меня есть 472 продукта, которые нужно сохранить, но только один из них на самом деле экономит. Поток закрывается после первого, и я не могу понять, почему.
Если я это сделаю:
...
.retrieve()
.bodyToMono(Void.class);
вместо этого запрос даже не поступает на сервер.
Я также попытался исправить количество элементов:
.body(Flux.just(new Product("123"), new Product("321")...
И с этим также только первый прибыл.
EDIT
Я изменил код:
@PostMapping(path = "/products", consumes =
MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> saveProducts(@Valid @RequestBody Flux<Product> products) {
products.subscribe(product -> this.service.saveProduct(product));
return Mono.empty();
}
и
this.targetWebClient
.post()
.uri(productUri)
.accept(MediaType.APPLICATION_STREAM_JSON)
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(this.sourceWebClient
.get()
.uri(uriBuilder -> uriBuilder.path(this.sourceEndpoint + "/id")
.queryParam("date", date)
.build())
.accept(MediaType.APPLICATION_STREAM_JSON)
.retrieve()
.bodyToFlux(Product.class), Product.class)
.exchange()
.block();
Это привело к тому, что один продукт был сохранен дважды (потому что внутренняя конечная точка была вызвана дважды), но снова только один элемент. А также мы получили ошибку на стороне интерфейса:
IOException: Connection reset by peer
То же для:
...
.retrieve()
.bodyToMono(Void.class)
.subscribe();
Выполнение следующих действий:
this.targetWebClient
.post()
.uri(productUri)
.accept(MediaType.APPLICATION_STREAM_JSON)
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(this.sourceWebClient
.get()
.uri(uriBuilder -> uriBuilder.path(this.sourceEndpoint + "/id")
.queryParam("date", date)
.build())
.accept(MediaType.APPLICATION_STREAM_JSON)
.retrieve()
.bodyToFlux(Product.class), Product.class)
.retrieve();
Приводит к тому, что серверная часть снова не вызывается вообще.