Rx Java `switchOnNext` с бесконечными потоками - PullRequest
0 голосов
/ 01 апреля 2020

Я пытаюсь написать простой пример, использующий оператор switchOnNext. Я хочу генерировать бесконечный поток бесконечных потоков. EАч индивидуальный поток будет генерировать от 1 до бесконечности.

Использование switchOnNext Я ожидаю, что каждая наблюдаемая будет испускать свои первые n элементы, а затем следующий и т. Д.

Для создания наблюдаемой, которая генерирует значения из 1 до бесконечности я реализовал функцию stati c rangeInf. Метод main содержит логи c, которые должны печатать значения.

Однако при запуске программы подписывается только первый поток, и только его значения выводятся на консоль. Ради здравого смысла я взял наблюдаемый интервал (встроенный, а также бесконечный), но там поведение такое, как и ожидалось.

Что мне здесь не хватает?

Сначала я подумал, что это произойдет, потому что интервал находится в отдельном отдельном потоке. Но я попытался добавить .observeOn(Schedulers.computation()) в методе rangeInf, но это, похоже, тоже не решает проблему.

Вывод с interval

> Task :Main.main()
Next observable
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
id 144: value 1
Next observable
id 115: value 1
id 115: value 1
id 115: value 1
id 115: value 1
...

Вывод с Observable.generate

> Task :Main.main()
Next observable
id 173: value 0
id 173: value 1
id 173: value 2
id 173: value 3
id 173: value 4
id 173: value 5
...

Источник

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.BiConsumer;
import io.reactivex.rxjava3.functions.BiFunction;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.schedulers.Schedulers;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class Main {
    static Random r = new Random();
    public static void main(String[] args) throws InterruptedException {
        Observable<Observable<String>> inf =
                Observable.interval(0, 10, TimeUnit.SECONDS)
                        .map(x -> {
                            int id = r.nextInt(1000);
                            return Main.rangeInf().map(v -> String.format("%d: %d", id, v));
                        })
                        .doOnNext(i -> System.out.println("Next observable"));

        Observable.switchOnNext(inf)
                .subscribe(System.out::println);

        while(true) {
            Thread.sleep(10000);
        }
    }

    public static Observable<Integer> inf() {
        Observable<Integer> inf=  Observable.interval(1, TimeUnit.SECONDS)
                .map(x -> (int) Math.random() * 3000 + 1);

        return inf;

    }
    public static Observable<Integer> rangeInf() {
        // Initial state.
        Supplier<Integer> s = () -> 0;
        // Generator.
        BiFunction<Integer, Emitter<Integer>, Integer> nxt = (i, e) -> {
            e.onNext(i);
            delay(); // delay random amount of time.
            return i + 1;
        };
        return Observable.generate(s, nxt);
    }

    public static void delay() {
        int random = (int) (Math.random() * 1 + 1);
        try {
            Thread.sleep(random * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
...