Параллельная обработка списка <Mono <Object>> с использованием Reactive - PullRequest
0 голосов
/ 13 июня 2019

У меня есть метод, который перебирает товары из корзины и размещает заказ на них, используя placeOrder. После того, как placeOrder вызван для всех товаров в корзине, я хочу объединить и отправить один моно-объект, в котором указывается, какой заказ прошел, а какой нет

Этот код работает, но не использует параллельное выполнение placeOrder.

List<Mono<OrderResponse>> orderResponse = new ArrayList<Mono<OrderResponse>>();
        OrderCombinedResponse combinedResponse = new OrderCombinedResponse();
//placeIndividualOrder returns Mono<OrderResponse>
        session.getCartItems().forEach(cartItem ->
          orderResponse.add(placeIndividualOrder(cartItem)));

return Flux.concat(orderResponse).collectList().map(responseList -> {
            responseList.forEach(response -> {
//Do transformation to separate out failed and successful order

            });
//Return Mono<OrderCombinedResponse> object
            return combinedResponse;
        });

Я пытаюсь использовать приведенный ниже код для параллельной обработки заказов в корзине, но он не возвращает никакого ответа и просто выходит

//Return Mono<OrderCombinedResponse> object 
return Flux.fromIterable(session.getCartItems()).parallel()
//Call method to place order. This method return Mono<OrderResponse>
.map(cartItem -> placeIndividualOrder(cartItem))
.runOn(Schedulers.elastic())
//
.map(r -> {
                    r.subscribe(response -> {
                        //Do transformation to separate out failed and successful order

                    });
                    return combinedResponse;
                });

1 Ответ

0 голосов
/ 13 июня 2019

, поскольку метод placeIndivisualOrder() возвращает Mono, необходимо вызвать его с помощью .flatMap()..runOn() должно идти выше звонка до placeIndivisualOrder().Если после этого, как в приведенном выше коде, вы запускаете только следующий .map() в планировщике.Наконец, вместо того, чтобы звонить subscribe() внутри .map(), как вы, вам нужно просто позвонить .subscribe() после .flatMap():

return Flux.fromIterable(session.getCartItems()).parallel()
    .runOn(Schedulers.elastic())
    //Call method to place order. This method return Mono<OrderResponse>
    .flatMap(cartItem -> placeIndividualOrder(cartItem))
    .sibscribe(response -> {
         // do something with response
    },
    e -> {
         // catch and report error
    }) 
...