Операторы Rx.Игнорировать, пока следующий не будет выпущен - PullRequest
0 голосов
/ 17 декабря 2018

В моем приложении есть логика, отнимающая много времени, которая может быть запущена разными способами, скажем, автоматически или вручную пользователем.

// Let's describe different event sources as relays
val autoStarts = PublishRelay.create<Unit>()
val manualStarts = PublishRelay.create<Unit>()
val syncStarts = PublishRelay.create<Unit>()

// This is my time consuming operation.
fun longOperation() = Observable.interval(10, TimeUnit.SECONDS).take(1).map { Unit }

val startsDisposable = Observable
        .merge(
                autoStarts.flatMap { Observable.just(Unit).delay(30, TimeUnit.SECONDS) },
                manualStarts
        )
        .subscribe(syncStarts) // merge emissions of both sources into one

val syncDisposable = syncStarts
        .concatMap {
            longOperation()
        }
        .subscribe(autoStarts) // end of long operation trigger start of auto timer

Стартовые реле могут производить много выбросов.Допустим, пользователь нажимает кнопку ручного запуска, и до автоматического запуска по таймеру остается 5 секунд.Оба события приведут к запуску longOperation(), если это было просто flatMap.Я хочу, чтобы внутри longOperation() работал только один поток, поэтому, если он работает сейчас и не завершен - игнорируйте начальные выбросы, в любом случае окончание приведет к перезапуску таймера.

ConcatMap помогает мне там наполовину - он добавляет longOperation() к «очереди», чтобы они обрабатывались по очереди, но как я мог написать это, чтобы игнорировать любые дальнейшие запуски, пока первый полностью не завершится?

Ответы [ 2 ]

0 голосов
/ 18 декабря 2018

Найденное решение:

val syncDisposable = syncStarts
    .concatMap {
        longOperation()
    }
    .take(1) // Complete after first longOperation() emit next item
    .repeat() // Resubscribe to this chain onCompleted so we continue to listen syncStarts ticks
    .subscribe(autoStarts) 

Хотя это работает, оно выглядит не очень чистым.Ответ Боба выглядит более логичным, хотя он требует, чтобы цепочка была Flowable.

0 голосов
/ 17 декабря 2018

Вы можете использовать flatMap() с дополнительным целочисленным аргументом для ограничения параллелизма.

syncStarts
  .onBackpressureDrop()               // 1
  .flatMap(() -> longOperation(), 1)  // 2
  ...
  1. Отбрасывать любые выбросы, которые происходят, когда flatMap() занят.
  2. Число1 - это количество подписок, которые flatMap() делает, по существу, заставляя операции быть последовательными.

Выше перечисленные функции выполняют те функции, которые вам нужны.Тем не менее, вы не указали, что хотели бы сделать после запуска longOperation(): вы хотели, чтобы сразу после этого была запущена другая операция?Если это так, вам нужно изменить обработку обратного давления, чтобы поставить в очередь не более одного выброса.

...