У меня есть следующий минимальный (не) рабочий пример:
import org.junit.Test;
import io.reactivex.Observable;
import com.google.common.util.concurrent.SettableFuture;
@Test
public void testTest() {
final Observable<Long> initialValues = Observable.fromArray(100L, 200L, 300L);
final SettableFuture<Long> future = SettableFuture.create();
final Observable<Long> newValues = Observable.fromFuture(future);
new Thread(
() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
future.set(400L);
}
).start();
initialValues.takeUntil(newValues).concatWith(newValues).subscribe(System.out::println, Throwable::printStackTrace);
}
Я ожидал следующий вывод:
100
200
300
но вывод был
400
Есть ли какая-то ошибка в моей логике?Я пытаюсь выполнить следующее поведение:
Пусть будет 2 наблюдаемых: A, B. Возьмите значения от A, пока B не начнут публиковать значения, затем включите исключительно B.