Как можно реализовать Rx Java интервальный / бесконечный счетчик с Kotlin Flow реактивным способом? - PullRequest
2 голосов
/ 28 мая 2020

Как может интервальный аналог Rx Java interval, бесконечный счетчик с задержкой и единицей времени, реализованный с Flow реактивным способом? И как должен работать механизм отмены, поскольку единственный способ отменить поток - это отменить область действия или задание сопрограммы, и после отмены этой сопрограммы она не работает?

1 Ответ

0 голосов
/ 31 мая 2020

Я сделал простую реализацию Rx Java interval.

fun interval(timeInMillis: Long, timeUnit: TimeUnit): Flow<Long> = flow {

    var counter: Long = 0

    val delayTime = when (timeUnit) {
        TimeUnit.MICROSECONDS -> timeInMillis / 1000
        TimeUnit.NANOSECONDS -> timeInMillis / 1_000_000
        TimeUnit.SECONDS -> timeInMillis * 1000
        TimeUnit.MINUTES -> 60 * timeInMillis * 1000
        TimeUnit.HOURS -> 60 * 60 * timeInMillis * 1000
        TimeUnit.DAYS -> 24 * 60 * 60 * timeInMillis * 1000
        else -> timeInMillis
    }

    while (true) {
        delay(delayTime)
        emit(counter++)
    }

}

И использовал его с launchIn, чтобы получить job аналогично disposable.

val coroutineScope = CoroutineScope(SupervisorJob())

  val job =  coroutineScope.launch {
        val job = interval(1, TimeUnit.SECONDS)
            .onStart {
                emit(-1)
            }
            .onEach {
                println(it)
            }
            .map {
                "Current time $it"
            }
            .launchIn(coroutineScope)

    }

Used другой coroutineScope, чтобы не отменять другие jobs, которые есть в coroutineScope, когда функция coroutineScope.cancel() или отмена задания, принадлежащая этому coroutineScope, отменена, задания не могут быть запущены снова.

Конечно, это ответ может быть улучшен или лучше, я не принимаю.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...