WebFlux: только один элемент поступает на сервер - PullRequest
0 голосов
/ 27 августа 2018

На бэкэнде я делаю:

@PostMapping(path = "/products", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public void saveProducts(@Valid @RequestBody Flux<Product> products) {
    products.subscribe(product -> log.info("product: " + product.toString()));
}

И на фронте я называю это используя:

this.targetWebClient
            .post()
            .uri(productUri)
            .accept(MediaType.APPLICATION_STREAM_JSON)
            .contentType(MediaType.APPLICATION_STREAM_JSON)
            .body(this.sourceWebClient
                    .get()
                    .uri(uriBuilder -> uriBuilder.path(this.sourceEndpoint + "/id")
                            .queryParam("date", date)
                            .build())
                    .accept(MediaType.APPLICATION_STREAM_JSON)
                    .retrieve()
                    .bodyToFlux(Product.class), Product.class)
            .exchange()
            .subscribe();

Что происходит сейчас, так это то, что у меня есть 472 продукта, которые нужно сохранить, но только один из них на самом деле экономит. Поток закрывается после первого, и я не могу понять, почему.

Если я это сделаю:

...
.retrieve()
.bodyToMono(Void.class);

вместо этого запрос даже не поступает на сервер.

Я также попытался исправить количество элементов:

.body(Flux.just(new Product("123"), new Product("321")...

И с этим также только первый прибыл.

EDIT

Я изменил код:

@PostMapping(path = "/products", consumes = 
MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> saveProducts(@Valid @RequestBody Flux<Product> products) {
    products.subscribe(product -> this.service.saveProduct(product));

    return Mono.empty();
}

и

this.targetWebClient
            .post()
            .uri(productUri)
            .accept(MediaType.APPLICATION_STREAM_JSON)
            .contentType(MediaType.APPLICATION_STREAM_JSON)
            .body(this.sourceWebClient
                    .get()
                    .uri(uriBuilder -> uriBuilder.path(this.sourceEndpoint + "/id")
                            .queryParam("date", date)
                            .build())
                    .accept(MediaType.APPLICATION_STREAM_JSON)
                    .retrieve()
                    .bodyToFlux(Product.class), Product.class)
            .exchange()
            .block();

Это привело к тому, что один продукт был сохранен дважды (потому что внутренняя конечная точка была вызвана дважды), но снова только один элемент. А также мы получили ошибку на стороне интерфейса:

IOException: Connection reset by peer

То же для:

...
.retrieve()
.bodyToMono(Void.class)
.subscribe();

Выполнение следующих действий:

this.targetWebClient
            .post()
            .uri(productUri)
            .accept(MediaType.APPLICATION_STREAM_JSON)
            .contentType(MediaType.APPLICATION_STREAM_JSON)
            .body(this.sourceWebClient
                    .get()
                    .uri(uriBuilder -> uriBuilder.path(this.sourceEndpoint + "/id")
                            .queryParam("date", date)
                            .build())
                    .accept(MediaType.APPLICATION_STREAM_JSON)
                    .retrieve()
                    .bodyToFlux(Product.class), Product.class)
            .retrieve();

Приводит к тому, что серверная часть снова не вызывается вообще.

1 Ответ

0 голосов
/ 27 августа 2018

Документация Reactor говорит, что ничего не произойдет, пока вы не подпишетесь , но это не значит, что вы должны подписаться в своем коде Spring WebFlux.

Вот несколько правил, которым вы должны следовать в Spring WebFlux:

  • Если вам нужно что-то сделать реактивным образом, тип возвращаемого вами метода должен быть Mono или Flux
  • Внутри метода, возвращающего реактивную опечатку, вы никогда не должны вызывать block или subscribe, toIterable или любой другой метод, который не возвращает сам реактивный тип
  • Вы никогда не должны выполнять операции ввода-вывода, связанные с побочными эффектами DoOnXYZ, так как они не предназначены для этого, и это вызовет проблемы во время выполнения

В вашем случае ваш бэкэнд должен использовать реактивный репозиторий для сохранения ваших данных и должен выглядеть следующим образом:

@PostMapping(path = "/products", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> saveProducts(@Valid @RequestBody Flux<Product> products) {
    return productRepository.saveAll(products).then();
}

В этом случае тип возврата Mono<Void> означает, что ваш контроллер не будет возвращать ничего в качестве тела ответа, но будет по-прежнему сигнализировать, когда завершит обработку запроса. Это может объяснить, почему вы наблюдаете такое поведение - к тому времени, когда контроллер завершит обработку запроса, все продукты не сохраняются в базе данных.

Кроме того, помните правила, указанные выше. В зависимости от того, где используется ваш targetWebClient, вызов .subscribe(); для него может не быть решением. Если это тестовый метод, который возвращает void, вы можете вызвать block для него и получить результат для проверки утверждений на нем. Если это компонентный метод, вам, вероятно, следует вернуть тип Publisher в качестве возвращаемого значения.

EDIT:

@PostMapping(path = "/products", consumes = 
MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> saveProducts(@Valid @RequestBody Flux<Product> products) {
    products.subscribe(product -> this.service.saveProduct(product));

    return Mono.empty();
}

Делать это не правильно:

  • вызов подписки отделяет обработку запроса / ответа от этой saveProduct операции. Это похоже на запуск этой обработки в другом исполнителе.
  • , возвращая Mono.empty(), сигнализирует Spring WebFlux о том, что вы сразу же завершили обработку запроса. Поэтому Spring WebFlux закроет и очистит ресурсы запроса / ответа; но ваш saveProduct процесс все еще работает и не сможет прочитать запрос, поскольку Spring WebFlux закрыл и очистил его.

Как предлагается в комментариях, вы можете обернуть блокирующие операции с помощью Reactor (даже если это не рекомендуется и могут возникнуть проблемы с производительностью) и убедиться, что вы соединяете все операции в одном реактиве трубопровод.

...