В моем приложении есть логика, отнимающая много времени, которая может быть запущена разными способами, скажем, автоматически или вручную пользователем.
// Let's describe different event sources as relays
val autoStarts = PublishRelay.create<Unit>()
val manualStarts = PublishRelay.create<Unit>()
val syncStarts = PublishRelay.create<Unit>()
// This is my time consuming operation.
fun longOperation() = Observable.interval(10, TimeUnit.SECONDS).take(1).map { Unit }
val startsDisposable = Observable
.merge(
autoStarts.flatMap { Observable.just(Unit).delay(30, TimeUnit.SECONDS) },
manualStarts
)
.subscribe(syncStarts) // merge emissions of both sources into one
val syncDisposable = syncStarts
.concatMap {
longOperation()
}
.subscribe(autoStarts) // end of long operation trigger start of auto timer
Стартовые реле могут производить много выбросов.Допустим, пользователь нажимает кнопку ручного запуска, и до автоматического запуска по таймеру остается 5 секунд.Оба события приведут к запуску longOperation()
, если это было просто flatMap
.Я хочу, чтобы внутри longOperation()
работал только один поток, поэтому, если он работает сейчас и не завершен - игнорируйте начальные выбросы, в любом случае окончание приведет к перезапуску таймера.
ConcatMap
помогает мне там наполовину - он добавляет longOperation()
к «очереди», чтобы они обрабатывались по очереди, но как я мог написать это, чтобы игнорировать любые дальнейшие запуски, пока первый полностью не завершится?