Создать интервал синхронизации в RxJava - PullRequest
0 голосов
/ 22 марта 2020

Мне нужно запустить 2 JOB с указанным c интервалом 4,8,12,16 ... секунды, а другой - 5,9,13,17 ... секунды.

Я использовал оператор Interval в Rx Java. Задание B должно выполняться после задания A. Задание B должно находиться в спящем режиме, когда задание A выполняется, и наоборот. До сих пор код выглядит ниже

var compositeDisposable = CompositeDisposable()
compositeDisposable.add(Observable.interval(0, recordIntervalPeriod, TimeUnit.MILLISECONDS)
                        .serialize()
                        .subscribe {
                            JobA()
                        })
compositeDisposable.add(Observable.interval(0, recorderStopIntervalStartTime, TimeUnit.MILLISECONDS)
                        .serialize()
                        .subscribe {
                            JobB()
                        })

Нужна помощь в следующем

1. Лучший способ достичь вышеуказанного с использованием Rx Java

2. Запустите JobA на 4 секунды, затем запустите JobB на 4 секунды и повторите процесс снова.

Ответы [ 2 ]

1 голос
/ 23 марта 2020

Я бы посоветовал вам использовать одно задание, которое выполняется каждую секунду, и каждый раз решать, какое задание вызывать, исходя из значения счетчика:

val disposable = Observable.interval(1, TimeUnit.SECONDS)
        .serialize()
        .subscribe { counter ->
            if (counter % 4 == 0L) {
                jobA()
            } else if ((counter - 1) % 4 == 0L) {
                jobB()
            }
        }

Если вы все еще хотите использовать два наблюдаемых, я думаю, это тоже будет работать:

val disposable = CompositeDisposable()
disposable.addAll(
        Observable.interval(4, TimeUnit.SECONDS)
                .subscribe {
                    jobA()
                },
        Observable.interval(4, TimeUnit.SECONDS)
                .delay(1, TimeUnit.SECONDS)
                .subscribe {
                    jobB()
                })

Отказ от ответственности: Я не использовал Rx Java много.

0 голосов
/ 23 марта 2020

Как насчет

Observable.interval(4,TimeUnit.SECONDS)
    .flatMap({
        jobA().zipWith(Observable.timer(1, TimeUnit.SECONDS) }
            .flatMap { jobB() }
    }, maxConcurrent = 1).subscribe()

Я предполагаю, что jobA() и jobB() являются наблюдаемыми в некотором роде.

Задание A должно ждать выполнения задания B, поскольку Максимальный параллелизм установлен на 1.

Задание B должно ожидать задания A или 1 секунду с начала задания A, в зависимости от того, что произойдет позднее.

...