В нереактивном мире следующий фрагмент кода не является чем-то особенным:
interface Enhancer {
Result enhance(Result result);
}
Result result = Result.empty();
result = fooEnhancer.enhance(result);
result = barEnhancer.enhance(result);
result = bazEnhancer.enhance(result);
Существует три различных реализации Enhancer, которые берут экземпляр Result, улучшают его и возвращают улучшенный результат.Давайте предположим, что порядок вызовов энхансера имеет значение.
А что, если эти методы будут заменены реактивными вариантами, возвращающими Flux<Result>
?Поскольку методы зависят от результата (ов) предыдущего метода, мы не можем использовать combineLatest
здесь.
Возможное решение может быть:
Flux.just(Result.empty())
.switchMap(result -> first(result)
.switchMap(result -> second(result)
.switchMap(result -> third(result))))
.subscribe(result -> doSomethingWith(result));
Обратите внимание, что switchMap
звонки являются вложенными.Поскольку нас интересует только конечный результат, мы разрешаем switchMap
переключаться на следующий поток, как только в предшествующих потоках появляются новые события.
Теперь давайте попробуем сделать это с динамическим числом потоков.Не реактивный (без потоков), в этом опять ничего особенного:
List<Enhancer> enhancers = <ordered list of different Enhancer impls>;
Result result = Result.empty();
for (Enhancer enhancer : enhancers) {
result = enhancer.enhance(result);
}
Но как я могу обобщить приведенный выше реактивный пример с тремя потоками, чтобы иметь дело с произвольным числом потоков?