Я пытаюсь написать простой пример, использующий оператор switchOnNext
. Я хочу генерировать бесконечный поток бесконечных потоков. EАч индивидуальный поток будет генерировать от 1 до бесконечности.
Использование switchOnNext
Я ожидаю, что каждая наблюдаемая будет испускать свои первые n
элементы, а затем следующий и т. Д.
Для создания наблюдаемой, которая генерирует значения из 1 до бесконечности я реализовал функцию stati c rangeInf
. Метод main
содержит логи c, которые должны печатать значения.
Однако при запуске программы подписывается только первый поток, и только его значения выводятся на консоль. Ради здравого смысла я взял наблюдаемый интервал (встроенный, а также бесконечный), но там поведение такое, как и ожидалось.
Что мне здесь не хватает?
Сначала я подумал, что это произойдет, потому что интервал находится в отдельном отдельном потоке. Но я попытался добавить .observeOn(Schedulers.computation())
в методе rangeInf
, но это, похоже, тоже не решает проблему.
Вывод с interval
> Task :Main.main()
Next observable
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
Next observable
id 115: value 1
id 115: value 1
id 115: value 1
id 115: value 1
...
Вывод с Observable.generate
> Task :Main.main()
Next observable
id 173: value 0
id 173: value 1
id 173: value 2
id 173: value 3
id 173: value 4
id 173: value 5
...
Источник
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.BiConsumer;
import io.reactivex.rxjava3.functions.BiFunction;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class Main {
static Random r = new Random();
public static void main(String[] args) throws InterruptedException {
Observable<Observable<String>> inf =
Observable.interval(0, 10, TimeUnit.SECONDS)
.map(x -> {
int id = r.nextInt(1000);
return Main.rangeInf().map(v -> String.format("%d: %d", id, v));
})
.doOnNext(i -> System.out.println("Next observable"));
Observable.switchOnNext(inf)
.subscribe(System.out::println);
while(true) {
Thread.sleep(10000);
}
}
public static Observable<Integer> inf() {
Observable<Integer> inf= Observable.interval(1, TimeUnit.SECONDS)
.map(x -> (int) Math.random() * 3000 + 1);
return inf;
}
public static Observable<Integer> rangeInf() {
// Initial state.
Supplier<Integer> s = () -> 0;
// Generator.
BiFunction<Integer, Emitter<Integer>, Integer> nxt = (i, e) -> {
e.onNext(i);
delay(); // delay random amount of time.
return i + 1;
};
return Observable.generate(s, nxt);
}
public static void delay() {
int random = (int) (Math.random() * 1 + 1);
try {
Thread.sleep(random * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}