RxJava CombineLatest ведет себя по-разному в зависимости от порядка - PullRequest
0 голосов
/ 02 июля 2019

Я хочу объединить две наблюдаемые names и nums в класс Student - образец, который отражает последнее состояние всех этих наблюдаемых.

Здесь излучаются обе наблюдаемые,и onNext всегда печатается для nums и names.

Однако наблюдаемое возвращается из combineLatest emits, только если наблюдаемое nums размещено первым.

public class RxTrial1 {
    public static class Student{
        public String name;
        public long roll;
        public Student(String name, long roll) {
            super();
            this.name = name;
            this.roll = roll;
        }
        @Override
        public String toString() {
            return "Student [" + (name != null ? "name=" + name + ", " : "") + "roll=" + roll + "]";
        }
    }


    public static void main(String[] args) {

        Observable<String> strs = Observable.just("a", "b", "c").repeat();
        ConnectableObservable<Long> nums = Observable.interval(1, TimeUnit.SECONDS)
                                                .doOnNext( n -> System.out.println( "nums: " + n ) )
                                                .publish(); 

        Observable<String> names = nums
                                    .zipWith(strs, (n, s)->{ return s+n; })
                                    .doOnNext( s -> System.out.println( "names: " + s ) );

        Observable.combineLatest(nums, names, (s, n)->{ return new Student(n,s); })
            .subscribeOn(Schedulers.newThread())
            .doOnNext( st -> System.out.println( "combined: " + st ) )
            .subscribe( st -> System.out.println( "Student: " + st ) );




        nums.connect();


        wait10();
    }

    private static void wait10() {
        System.out.println("Start waiting");
        try {
            Thread.sleep(10_000l);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Done waiting");
    }
}

Почему он излучает, только если nums ставится первым?

Работает нормально с nums сначала:

Start waiting
nums: 0
names: a0
combined: Student [name=a0, roll=0]
Student: Student [name=a0, roll=0]
nums: 1
combined: Student [name=a0, roll=1]
Student: Student [name=a0, roll=1]
names: b1
combined: Student [name=b1, roll=1]
Student: Student [name=b1, roll=1]
nums: 2
combined: Student [name=b1, roll=2]
Student: Student [name=b1, roll=2]
names: c2
combined: Student [name=c2, roll=2]
Student: Student [name=c2, roll=2]
...

Сохраняет спокойствие с names сначала:

Start waiting
nums: 0
names: a0
nums: 1
names: b1
nums: 2
names: c2
nums: 3
names: a3
...

Примечания:

  1. Работает только, если я добавлю подписку на объединенную наблюдаемую.В противном случае он отправляется в OutOfMemory через несколько минут.
  2. Даже с именами в первую очередь он начинает работать нормально, если я добавлю take (10) к обоим.
...