Rx-Java 2: Как установить приоритет между Observables? - PullRequest
0 голосов
/ 11 февраля 2019

У меня есть алгоритм, основанный на времени, в некоторых ситуациях два выброса могут происходить в один и тот же момент из разных наблюдаемых.

См. Код ниже:

   int moment = ...; //calculated
   Observable.timer(moment, unit)
    .takeUntil(messages) 
    .map(e -> new Action())

The messages Наблюдаемое выполняется на Schedulers.computation(), как таймер.

Когда сообщение приходит в одно и то же время с таймером, поведение является случайным.Иногда выдается новый Action, иногда нет, takeUntil(messages) имеет приоритет.

Мне нужно иметь полный контроль над этим поведением.Конечно, я могу добавить несколько миллисекунд к moment, например, так будет всегда выполняться после messages, но это не то, что я хочу.

Как сообщить RxJava, что messages наблюдаемый занимаетприоритет над таймером?

ОБНОВЛЕНИЕ :

Чтобы прояснить вопрос, я добавил этот тест, который он будет проходить каждый раз, когда выполняется

    //before launching the test
    RxJavaPlugins.setComputationSchedulerHandler(scheduler ->
                        new TestScheduler());


    @Test
    public void rx_precedence() {

        int moment = 5;
        Observable<Long> messages = Observable.timer(moment, TimeUnit.SECONDS);
        TestObserver<Long> testObserver = Observable.timer(moment, TimeUnit.SECONDS)
                .takeUntil(messages)
                .test();

        scheduler.advanceTimeTo(moment, TimeUnit.SECONDS);
        testObserver.assertNoValues();
        testObserver.assertTerminated();
    }

Находясь в том же moment, takeUntil выполняется и значения не передаются.

Но на сервере, в реальном мире, когда je JVM выполняет несколько операций одновременно,timer может иметь приоритет над takeUntil.

Мне нужен способ сделать этот приоритет стабильным в любой среде, чтобы никакие значения не генерировались, когда messages одновременно выдает значениеtimer заканчивается.

1 Ответ

0 голосов
/ 11 февраля 2019

Вам нужно Observable.amb() Полагаю, - http://reactivex.io/documentation/operators/amb.html

ОБНОВЛЕНИЕ: Хорошо, теперь проблема яснее.Возможно Scheduler.single() поможет.

...