RxJava2 - подключаемая наблюдаемая - воспроизведение не всех предыдущих элементов - PullRequest
0 голосов
/ 17 ноября 2018
 List<Integer> list = new ArrayList<Integer>();
    for(int j=1;j<=3;j++)
        list.add(j);


    Observable<Integer> observable = Observable.fromIterable(list)
    .replay()
    .autoConnect();



    observable.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.v("consumer1:", ""+integer);
        }
    });


    observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
    Log.v("consumer2:", ""+integer);

}
});



    observable.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.v("consumer3:", ""+integer);
        }
    });

когда я запускаю приведенный выше код, я получаю следующий вывод:

consumer1:: 1
consumer1:: 2
consumer1:: 3
consumer2:: 1
consumer2:: 2
consumer2:: 3
consumer3:: 1
consumer3:: 2
consumer3:: 3

Я ожидал, что воспроизведение фактически "воспроизведет" всю историю, которая произошла раньше.так что я ожидал испустить все потоки, которые произошли раньше.особенно это результат, который я ожидал:

//first time nothing to replay so just do the work
consumer1:: 1
consumer1:: 2
consumer1:: 3

//replay consumer1 stream:,
consumer2:: 1
consumer2:: 2
consumer2:: 3
//already replayed now do the work
consumer2:: 1
consumer2:: 2
consumer2:: 3

//replay consumer1 stream:
consumer3:: 1
consumer3:: 2
consumer3:: 3

//replay consumer2 stream:
consumer3:: 1
consumer3:: 2
consumer3:: 3
//now do the work
consumer3:: 1
consumer3:: 2
consumer3:: 3

это то, что я ожидаю, что произойдет с повтором.Что я делаю неправильно ?это как его даже не работает сейчас, как у меня есть.

1 Ответ

0 голосов
/ 17 ноября 2018

Не знаю, зачем вам это экзотическое поведение, но вы можете повторить с постоянно увеличивающимся числом:

List<Integer> list = new ArrayList<Integer>();
for (int j = 1; j <= 3; j++) {
    list.add(j);
}


AtomicInteger count = new AtomicInteger();

Observable<Integer> observable = 
    Observable.defer(() -> {
        Observable.fromIterable(list)
        .replay()
        .autoConnect()
        .repeat(count.incrementAndGet());
    });

observable.subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.v("consumer1:", ""+integer);
    }
});

observable.subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.v("consumer2:", "" + integer);
    }
});

observable.subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.v("consumer3:", ""+integer);
    }
});
...