Я хочу реализовать довольно простую группу обеспечения доступности баз данных в RxJava.
У нас есть источник элементов:
Observable<String> itemsObservable = Observable.fromIterable(items)
Далее, я хотел бы иметьпроцессор, который будет подписываться на itemsObservable
и позволит нескольким подписчикам подписываться на него.
Итак, я создал:
PublishProcessor<String> itemsProccessor = PublishProcessor.create();
К сожалению, это невозможно:
itemsObservable.subscribe(itemsProccessor);
Почему?Какой правильный API для реализации такого рода DAG?
Вот схема для демонстрации:
Вот моя (неудачная) попыткадля реализации этого вида DAG:
List<String> items = Arrays.asList("a", "b", "c");
Flowable<String> source = Flowable.fromIterable(items);
PublishProcessor<String> processor = PublishProcessor.create();
processor.doOnNext(s -> s.toUpperCase());
processor.subscribe(System.out::println);
processor.subscribe(System.out::println);
source.subscribe(processor);