RxJava 2: Почему PublishProcessor не может подписаться на Observable? - PullRequest
0 голосов
/ 23 апреля 2019

Я хочу реализовать довольно простую группу обеспечения доступности баз данных в RxJava.

У нас есть источник элементов:

Observable<String> itemsObservable = Observable.fromIterable(items)

Далее, я хотел бы иметьпроцессор, который будет подписываться на itemsObservable и позволит нескольким подписчикам подписываться на него.

Итак, я создал:
PublishProcessor<String> itemsProccessor = PublishProcessor.create();

К сожалению, это невозможно:
itemsObservable.subscribe(itemsProccessor);

Почему?Какой правильный API для реализации такого рода DAG?

Вот схема для демонстрации:

enter image description here

Вот моя (неудачная) попыткадля реализации этого вида DAG:

List<String> items = Arrays.asList("a", "b", "c");
Flowable<String> source = Flowable.fromIterable(items);

PublishProcessor<String> processor = PublishProcessor.create();
processor.doOnNext(s -> s.toUpperCase());

processor.subscribe(System.out::println);
processor.subscribe(System.out::println);
source.subscribe(processor); 

1 Ответ

3 голосов
/ 23 апреля 2019

Это потому, что PublishProcessor реализует Subscriber, а Observable метод подписки принимает Observer. Вы можете конвертировать ваш itemsObservable в Flowable, и он сделает эту работу.

    Observable<String> items = Observable.fromIterable(Arrays.asList("a","b"));
    PublishProcessor<String> processor = PublishProcessor.create();
    items.toFlowable(BackpressureStrategy.BUFFER)
            .subscribe(processor);
...