Внедрение стратегии отсрочки перевода - PullRequest
0 голосов
/ 14 июля 2020

Я пытаюсь реализовать стратегию отсрочки передачи, просто используя kotlin flow.

Мне нужно получить данные от timeA к timeB

result = dataBetween(timeA - timeB)

если результат пуст, я хочу увеличить окно времени окончания с помощью экспоненциальной задержки

result = dataBetween(timeA - timeB + exponentialBackOffInDays)

Я следил за этой статьей, в которой объясняется, как подойти к этому в rxjava2.

Но застрял в точке, где flow еще не имеет takeUntil оператора .

Вы можете увидеть мою реализацию ниже.

fun main() {
    runBlocking {
        (0..8).asFlow()
            .flatMapConcat { input ->
                // To simulate a data source which fetches data based on a time-window start-date to end-date 
                // available with in that time frame.
                flow {
                    println("Input: $input")
                    if (input < 5) {
                        emit(emptyList<String>())
                    } else { // After emitting this once the flow should complete
                        emit(listOf("Available"))
                    }
                }.retryWhenThrow(DummyException(), predicate = {
                    it.isNotEmpty()
                })
            }.collect {
                //println(it)
            }
    }
}

class DummyException : Exception("Collected size is empty")

private inline fun <T> Flow<T>.retryWhenThrow(
    throwable: Throwable,
    crossinline predicate: suspend (T) -> Boolean
): Flow<T> {
    return flow {
        collect { value ->
            if (!predicate(value)) {
                throw throwable // informing the upstream to keep emitting since the condition is met
            }
            println("Value: $value")
            emit(value)
        }
    }.catch { e ->
        if (e::class != throwable::class) throw e
    }
}


Он работает нормально, за исключением того, что даже после того, как поток имеет успешное значение, поток продолжает собирать до 8 из восходящего потока, но в идеале он должен был остановиться, когда достигнет самого 5.

Любой была бы полезна помощь в том, как я должен подойти к этому.

1 Ответ

0 голосов
/ 21 июля 2020

Возможно, это не соответствует вашей точной настройке, но вместо вызова collect вы можете просто использовать first{...} или firstOrNull{...}. Это автоматически остановит восходящие потоки после того, как элемент будет найден. Например:

flowOf(0,0,3,10)
    .flatMapConcat {
        println("creating list with $it elements")
        flow {
            val listWithElementCount = MutableList(it){ "" }  // just a list of n empty strings
            emit(listWithElementCount)
        }
    }.first { it.isNotEmpty() }

Кстати, ваша проблема звучит так, как будто обычная функция приостановки была бы лучше. Что-то вроде

suspend fun getFirstNonEmptyList(initialFrom: Long, initialTo: Long): List<Any> {
    var from = initialFrom
    var to = initialTo
    while (coroutineContext.isActive) {
        val elements = getElementsInRange(from, to) // your  "dataBetween"
        if (elements.isNotEmpty()) return elements
        val (newFrom, newTo) = nextBackoff(from, to)
        from = newFrom
        to = newTo
    }
    throw CancellationException()
}
...