Я пытаюсь реализовать стратегию отсрочки передачи, просто используя kotlin flow
.
Мне нужно получить данные от timeA к timeB
result = dataBetween(timeA - timeB)
если результат пуст, я хочу увеличить окно времени окончания с помощью экспоненциальной задержки
result = dataBetween(timeA - timeB + exponentialBackOffInDays)
Я следил за этой статьей, в которой объясняется, как подойти к этому в rxjava2
.
Но застрял в точке, где flow
еще не имеет takeUntil
оператора .
Вы можете увидеть мою реализацию ниже.
fun main() {
runBlocking {
(0..8).asFlow()
.flatMapConcat { input ->
// To simulate a data source which fetches data based on a time-window start-date to end-date
// available with in that time frame.
flow {
println("Input: $input")
if (input < 5) {
emit(emptyList<String>())
} else { // After emitting this once the flow should complete
emit(listOf("Available"))
}
}.retryWhenThrow(DummyException(), predicate = {
it.isNotEmpty()
})
}.collect {
//println(it)
}
}
}
class DummyException : Exception("Collected size is empty")
private inline fun <T> Flow<T>.retryWhenThrow(
throwable: Throwable,
crossinline predicate: suspend (T) -> Boolean
): Flow<T> {
return flow {
collect { value ->
if (!predicate(value)) {
throw throwable // informing the upstream to keep emitting since the condition is met
}
println("Value: $value")
emit(value)
}
}.catch { e ->
if (e::class != throwable::class) throw e
}
}
Он работает нормально, за исключением того, что даже после того, как поток имеет успешное значение, поток продолжает собирать до 8
из восходящего потока, но в идеале он должен был остановиться, когда достигнет самого 5
.
Любой была бы полезна помощь в том, как я должен подойти к этому.