RXJava - постоянное переключение наблюдаемых при наличии данных по одному из них - PullRequest
0 голосов
/ 24 апреля 2018

У меня есть следующий минимальный (не) рабочий пример:

import org.junit.Test;
import io.reactivex.Observable;
import com.google.common.util.concurrent.SettableFuture;

@Test
public void testTest() {
    final Observable<Long> initialValues = Observable.fromArray(100L, 200L, 300L);
    final SettableFuture<Long> future = SettableFuture.create();
    final Observable<Long> newValues = Observable.fromFuture(future);

    new Thread(
            () -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                future.set(400L);
            }
    ).start();

    initialValues.takeUntil(newValues).concatWith(newValues).subscribe(System.out::println, Throwable::printStackTrace);
}

Я ожидал следующий вывод:

100

200

300

но вывод был

400

Есть ли какая-то ошибка в моей логике?Я пытаюсь выполнить следующее поведение:

Пусть будет 2 наблюдаемых: A, B. Возьмите значения от A, пока B не начнут публиковать значения, затем включите исключительно B.

1 Ответ

0 голосов
/ 24 апреля 2018

fromFuture является блокирующим потоком, который подписывается на него, поэтому initialValues никогда не получит шанса на запуск из-за takeUntil, подписавшегося сначала на newValues, а затем заблокированного.Попробуйте это:

initialValues
.takeUntil(newValues.subscribeOn(Schedulers.io()))
.concatWith(newValues)
.subscribe(System.out::println, Throwable::printStackTrace);
...