Делать, когда происходит любое из N событий, но используя последний результат всех других событий - PullRequest
0 голосов
/ 05 мая 2018

Допустим, есть N событий. Могу ли я что-то сделать, когда происходит событие E, где E - это событие в N событиях, но только после того, как каждое из событий произошло хотя бы один раз, и использовать последний результат для каждого события?

Я попробовал код ниже, ожидая, что результат будет таким же, как в комментариях

val o1 = PublishSubject.create<String>();
val o2 = PublishSubject.create<String>();
val o3 = PublishSubject.create<String>();

Observable.zip(o1,o2,o3,
        Function3<String, String, String, Array<String>> { t1, t2, t3 -> arrayOf(t1,t2,t3); })
        .observeOn(Schedulers.computation())
        .subscribe { t1 ->
            Log.d("so", "Result = " + t1.joinToString(" "));
        }

o1?.onNext("homer"); //o2 and o3 have not been ready.
Thread.sleep(1000);
o1?.onNext("bart");  //o2 and o3 have not been ready.
Thread.sleep(1000);
o3?.onNext("doughnut"); //o2 has not been ready.
Thread.sleep(1000);
o2?.onNext("loves"); //at this point, print "Result = bart loves doughnut". 
                     //The last value of o1 is "bart",
                     //and the last value of o3 is "doughnut"
Thread.sleep(1000);
o1?.onNext("marge"); //marge loves doughnut
Thread.sleep(1000);
o2?.onNext("hates"); //marge hates doughnut
Thread.sleep(1000);
o3?.onNext("pie");   //marge hates pie
Thread.sleep(1000);
o3?.onNext("cake");  //marge hates cake
Thread.sleep(1000);
o2?.onNext("sees");  //marge sees cake
Thread.sleep(1000);

Log.d("so", "Done");

Но фактический результат был

Result = homer loves doughnut
Result = bart hates pie
Result = marge sees cake
Done

1 Ответ

0 голосов
/ 05 мая 2018

Проблема с вашим решением состоит в том, что zip ( docs ) использует элементы только один раз. Если вы объединяете потоки [A1] и [B1, B2, B3], он будет генерировать только [(A1, B2)], а затем ждать следующего A, даже если следующие B уже доступны.

Однако оператор combineLatest ( документы ) может добиться желаемого поведения:

Observable.combineLatest(o1, o2, o3, Function3<String, String, String, String> {
    t1, t2, t3 -> arrayOf(t1, t2, t3).joinToString(" ")
}).subscribe {
    println(it)
}

Выход:

bart loves doughnut
marge loves doughnut
marge hates doughnut
marge hates pie
marge hates cake
marge sees cake
Done

В качестве общей рекомендации: проверьте все доступных операторов в ReactiveX. Графическая документация очень интуитивно понятна. В конце концов, это основные строительные блоки, которые вы хотите полностью понять, чтобы реализовать более сложное поведение.

...