У меня есть алгоритм, основанный на времени, в некоторых ситуациях два выброса могут происходить в один и тот же момент из разных наблюдаемых.
См. Код ниже:
int moment = ...; //calculated
Observable.timer(moment, unit)
.takeUntil(messages)
.map(e -> new Action())
The messages
Наблюдаемое выполняется на Schedulers.computation()
, как таймер.
Когда сообщение приходит в одно и то же время с таймером, поведение является случайным.Иногда выдается новый Action
, иногда нет, takeUntil(messages)
имеет приоритет.
Мне нужно иметь полный контроль над этим поведением.Конечно, я могу добавить несколько миллисекунд к moment
, например, так будет всегда выполняться после messages
, но это не то, что я хочу.
Как сообщить RxJava, что messages
наблюдаемый занимаетприоритет над таймером?
ОБНОВЛЕНИЕ :
Чтобы прояснить вопрос, я добавил этот тест, который он будет проходить каждый раз, когда выполняется
//before launching the test
RxJavaPlugins.setComputationSchedulerHandler(scheduler ->
new TestScheduler());
@Test
public void rx_precedence() {
int moment = 5;
Observable<Long> messages = Observable.timer(moment, TimeUnit.SECONDS);
TestObserver<Long> testObserver = Observable.timer(moment, TimeUnit.SECONDS)
.takeUntil(messages)
.test();
scheduler.advanceTimeTo(moment, TimeUnit.SECONDS);
testObserver.assertNoValues();
testObserver.assertTerminated();
}
Находясь в том же moment
, takeUntil
выполняется и значения не передаются.
Но на сервере, в реальном мире, когда je JVM выполняет несколько операций одновременно,timer
может иметь приоритет над takeUntil
.
Мне нужен способ сделать этот приоритет стабильным в любой среде, чтобы никакие значения не генерировались, когда messages
одновременно выдает значениеtimer
заканчивается.