Как объединить элементы из произвольного числа зависимых потоков? - PullRequest
0 голосов
/ 06 декабря 2018

В нереактивном мире следующий фрагмент кода не является чем-то особенным:

interface Enhancer {
    Result enhance(Result result);
}

Result result = Result.empty();
result = fooEnhancer.enhance(result);
result = barEnhancer.enhance(result);
result = bazEnhancer.enhance(result);

Существует три различных реализации Enhancer, которые берут экземпляр Result, улучшают его и возвращают улучшенный результат.Давайте предположим, что порядок вызовов энхансера имеет значение.

А что, если эти методы будут заменены реактивными вариантами, возвращающими Flux<Result>?Поскольку методы зависят от результата (ов) предыдущего метода, мы не можем использовать combineLatest здесь.

Возможное решение может быть:

Flux.just(Result.empty())
    .switchMap(result -> first(result)
        .switchMap(result -> second(result)
            .switchMap(result -> third(result))))
    .subscribe(result -> doSomethingWith(result));

Обратите внимание, что switchMapзвонки являются вложенными.Поскольку нас интересует только конечный результат, мы разрешаем switchMap переключаться на следующий поток, как только в предшествующих потоках появляются новые события.

Теперь давайте попробуем сделать это с динамическим числом потоков.Не реактивный (без потоков), в этом опять ничего особенного:

List<Enhancer> enhancers = <ordered list of different Enhancer impls>;
Result result = Result.empty();
for (Enhancer enhancer : enhancers) {
    result = enhancer.enhance(result);
}

Но как я могу обобщить приведенный выше реактивный пример с тремя потоками, чтобы иметь дело с произвольным числом потоков?

Ответы [ 2 ]

0 голосов
/ 10 декабря 2018

switchMap здесь неуместно.Если у вас есть List<Enhancer> к тому времени, когда Flux конвейер объявлен , почему бы не применить логику, близкую к той, что была у вас в императивном стиле:

List<Enhancer> enhancers = <ordered list of different Enhancer impls>;
Mono<Result> resultMono = Mono.just(Result.empty)
for (Enhancer enhancer : enhancers) {
    resultMono = resultMono.map(enhancer::enhance); //previousValue -> enhancer.enhance(previousValue)
}
return resultMono;

Это может дажебудет выполняться позже во время подписки для еще более динамического разрешения энхансеров, обернув весь приведенный выше код в блок Mono.defer(() -> {...}).

0 голосов
/ 07 декабря 2018

Я нашел решение с помощью рекурсии:

@FunctionalInterface
interface FluxProvider {
    Flux<Result> get(Result result);
}

// recursive method creating the final Flux
private Flux<Result> cascadingSwitchMap(Result input, List<FluxProvider> fluxProviders, int idx) {
    if (idx < fluxProviders.size()) {
        return fluxProviders.get(idx).get(input).switchMap(result -> cascadingSwitchMap(result, fluxProviders, idx + 1));
    }
    return Flux.just(input);
}

// code using the recursive method
List<FluxProvider> fluxProviders = new ArrayList<>();
fluxProviders.add(fooEnhancer::enhance);
fluxProviders.add(barEnhancer::enhance);
fluxProviders.add(bazEnhancer::enhance);

cascadingSwitchMap(Result.empty(), fluxProviders, 0)
        .subscribe(result -> doSomethingWith(result));

Но, возможно, есть более элегантное решение с использованием оператора / функции проекта-реактора.Кто-нибудь знает такую ​​особенность?На самом деле, требование не кажется таким необычным, не так ли?

...