Как манипулировать объектом, поступающим из Flux <Object>, со значением, исходящим от метода, излучающего Mono <Items>неблокирующим способом? - PullRequest
0 голосов
/ 17 января 2019

Я пытаюсь манипулировать моими объектами, полученными из Flux, с данными, полученными из Mono, где методы, излучающие Flux объекта и Mono элементов, являются разными вызовами API. Проблема в том, что я не контролирую потоки, а элементы, полученные из Mono, никогда не назначаются моему объекту, если я не намеренно блокирую () этот поток. Пожалуйста, предложите, если какой-либо неблокирующий способ возможен для этого сценария.

Я также изучил планировщики, подписаться, опубликовать, но не смог выяснить конвейер.

public Flux<Object> test {

 method1().map(obj -> {
        if (obj.getTotalItems() > 20) {
            obj.setItems(method2(obj).block());
        }
        return obj;
  });
}

Здесь method1 испускает поток объектов, полученных от попадания API.

И method2 генерирует список элементов, выбранных из другого попадания API.

Как я могу сделать весь этот поток неблокирующим?

1 Ответ

0 голосов
/ 17 января 2019

Попробуйте flatMap или concatMap

с помощью оператора flatMap вы можете сгладить подпоток в неблокирующей общедоступной

Flux<Object> test {

 method1().flatMap(obj -> {
        if (obj.getTotalItems() > 20) {
            return method2(obj)
                     .map(result -> {
                        obj.setItems(result);
                        return obj;
                     });
        }
        return Mono.just(obj);
  });
}

flatMap позволяет выравнивать несколько потоков одновременно, поэтому в случае длительных операций вы можете использовать более эффективные элементы процесса.

Одним недостатком flatMap является то, что он не сохраняет порядок элементов, поэтому если у вас есть последовательность вышестоящих элементов, таких как [1, 2, 3, 4] с flatMap, есть вероятность, что порядок будет изменен из-за асинхронного характера подпотоков .

Чтобы сохранить порядок, вы можете использовать concatMap, который выравнивает только один поток за раз, поэтому есть гарантии, что порядок элементов выравнивания будет сохранен:

Flux<Object> test {

 method1().concatMap(obj -> {
        if (obj.getTotalItems() > 20) {
            return method2(obj)
                     .map(result -> {
                        obj.setItems(result);
                        return obj;
                     });
        }
        return Mono.just(obj);
  });
}

Примечание

Мутация объектов таким способом - не лучшая идея, и я бы предпочел использовать неизменный объект объект-образец в реактивном программировании

...