Я сделал простую реализацию Rx Java interval.
fun interval(timeInMillis: Long, timeUnit: TimeUnit): Flow<Long> = flow {
var counter: Long = 0
val delayTime = when (timeUnit) {
TimeUnit.MICROSECONDS -> timeInMillis / 1000
TimeUnit.NANOSECONDS -> timeInMillis / 1_000_000
TimeUnit.SECONDS -> timeInMillis * 1000
TimeUnit.MINUTES -> 60 * timeInMillis * 1000
TimeUnit.HOURS -> 60 * 60 * timeInMillis * 1000
TimeUnit.DAYS -> 24 * 60 * 60 * timeInMillis * 1000
else -> timeInMillis
}
while (true) {
delay(delayTime)
emit(counter++)
}
}
И использовал его с launchIn, чтобы получить job
аналогично disposable
.
val coroutineScope = CoroutineScope(SupervisorJob())
val job = coroutineScope.launch {
val job = interval(1, TimeUnit.SECONDS)
.onStart {
emit(-1)
}
.onEach {
println(it)
}
.map {
"Current time $it"
}
.launchIn(coroutineScope)
}
Used другой coroutineScope
, чтобы не отменять другие jobs
, которые есть в coroutineScope
, когда функция coroutineScope.cancel()
или отмена задания, принадлежащая этому coroutineScope, отменена, задания не могут быть запущены снова.
Конечно, это ответ может быть улучшен или лучше, я не принимаю.