повторяющийся конвейер RxJava с многоадресной передачей - PullRequest
0 голосов
/ 17 декабря 2018

Я новичок в RxJava и не могу понять, как реализовать повторяющийся опрос с ConnectableObservable, когда 2 подписчика обрабатывают события в разных потоках.

У меня есть конвейер, который примерно выглядит так:

enter image description here

Я хотел бы повторить весь конвейер после задержки, аналогично решению из https://github.com/ReactiveX/RxJava/issues/448

Observable.fromCallable(() -> pollValue())
.repeatWhen(o -> o.concatMap(v -> Observable.timer(20, TimeUnit.SECONDS)));

или Значение динамической задержки с repeatWhen () .

Это прекрасно работает с обычным (не подключаемым) наблюдаемым, но не с многоадресной передачей.

Пример кода:

Работает

    final int[] i = {0};
    Observable<Integer> integerObservable =
            Observable.defer(() -> Observable.fromArray(i[0]++, i[0]++, i[0]++));

    integerObservable
            .observeOn(Schedulers.newThread())
            .collect(StringBuilder::new, (sb, x) -> sb.append(x).append(","))
            .map(StringBuilder::toString)
            .toObservable().repeatWhen(o -> o.concatMap(v -> Observable.timer(1, TimeUnit.SECONDS)))
            .subscribe(System.out::println);

Не работает:

    final int[] i = {0};
    ConnectableObservable<Integer> integerObservable =
            Observable.defer(() -> Observable.fromArray(i[0]++, i[0]++, i[0]++))
            .publish();

    integerObservable.observeOn(Schedulers.newThread()).subscribe(System.out::println);

    integerObservable
            .observeOn(Schedulers.newThread())
            .collect(StringBuilder::new, (sb, x) -> sb.append(x).append(","))
            .map(StringBuilder::toString)
            .toObservable().repeatWhen(o -> o.concatMap(v -> Observable.timer(1, TimeUnit.SECONDS)))
            .subscribe(System.out::println);

    integerObservable.connect();

Ответы [ 2 ]

0 голосов
/ 21 декабря 2018

Проблема во втором примере не в многоадресной рассылке.Это в операторе сбора и повторения при.

Подумайте о повторении, когда вы подключаетесь к методу onComplete как «ловушка».Этот «крючок» будет перехватывать метод onComplete и «переопределять» его, поэтому он не будет вызываться, за исключением первого раза, когда эта наблюдаемая завершена перед запуском процесса повторения.

Без вызова метода onComplete оператор сбора не знает, сколько элементов он должен собрать.Таким образом, вам придется разобраться с логикой того, как собирать предметы и где хранить их вне потока, но это будет обходной путь.

Вот пример для этого:

    List<String> test = new ArrayList<>();
    final String[] currentString = {""};

    final int[] i = {0};
    ConnectableObservable<Integer> integerObservable =
            Observable.defer(() -> Observable.fromArray(i[0]++, i[0]++, i[0]++))
                    .doOnComplete(() -> {
                        test.add(currentString[0]);
                        currentString[0] = "";
                    })
                    .repeatWhen(o -> {
                        return o.concatMap(v ->
                                Observable.timer(3, TimeUnit.SECONDS)
                                .doOnComplete(() -> {
                                    test.add(currentString[0]);
                                    currentString[0] = "";
                                }));
                    })
                    .publish();

    integerObservable
            .observeOn(Schedulers.newThread())
            .subscribe(System.out::println);

    integerObservable
            .observeOn(Schedulers.newThread())
            .map((sa) -> {
                currentString[0] = currentString[0] + sa;
                System.out.println(currentString[0]);
                return sa;
            })
            .subscribe();

В этом примере мы используем метод onComplete наблюдаемой, которую мы храним таймер для сброса нашего состояния.В этом примере не учитывается вариант, когда потребители потребляют данные медленнее, чем задержка повторения (это приведет к переполнению результата из одной цепочки данных в другую).Я бы предложил использовать другие способы обработки повторяющейся части, например:

final int [] i = {0};

    ConnectableObservable<Integer> integerObservable =
            Observable.defer(() -> Observable.fromArray(i[0]++, i[0]++, i[0]++)).publish();

    Observable b = integerObservable.observeOn(Schedulers.newThread());

    Observable a = integerObservable
            .observeOn(Schedulers.newThread())
            .collect(StringBuilder::new, (sb, x) -> sb.append(x).append(","))
            .map(StringBuilder::toString)
            .toObservable();

    Observable
            .interval(1, TimeUnit.SECONDS)
            .doOnSubscribe((d) -> {
                a.subscribe(System.out::println);
                b.subscribe(System.out::println);

                integerObservable.connect();
            })
            .doOnNext((d) -> {
                a.subscribe(System.out::println);
                b.subscribe(System.out::println);

                integerObservable.connect();
            })
            .doOnComplete(() -> {})
            .subscribe();

Первое подключение - это первый раз выполненияи onNext вызывается каждую 1 секунду, и он заново запускает весь конвейер с начала

Надеюсь, это поможет.

0 голосов
/ 20 декабря 2018

Еще не ясно, если это то, что вы просите, но, возможно, это поможет вам

  Observable<Integer> integerObservable1;
@Override
public void run(String... args) throws Exception {

    Integer[] integers = {1, 2, 3};
    Observable<Integer> integerObservable = Observable.fromArray(integers);

    integerObservable1 = Observable.zip(integerObservable.observeOn(Schedulers.newThread()).delay(100, TimeUnit.MILLISECONDS), integerObservable.observeOn(Schedulers.newThread()).delay(200, TimeUnit.MILLISECONDS), (integer, integer2) -> {
        System.out.println(integer + " " + integer2);
        return integer;
    })
            .doOnComplete(() -> {
                integerObservable1.subscribe();
            });
    integerObservable1.subscribe();

}
...