Как вы можете создать таймер, который работает на список в Rx? - PullRequest
0 голосов
/ 06 июня 2018

Я хочу найти весь список элементов, которые будут найдены до того, как я завершу, и если весь этот список не найден, то должно быть выброшено исключение (тайм-аут или пользовательский).Подобно встроенному в Observable.timer (), но вместо того, чтобы проходить тест после первого элемента, я хочу, чтобы он требовал, чтобы все элементы в списке были найдены.

Вот пример.Допустим, у меня есть тестовая функция, которая испускает Observable .Это выглядит так:

var emittedList: List<String?> = listOf(null, "202", "302", "400")

data class FoundNumber(val numberId: String?)

fun scanNumbers(): Observable<FoundNumber> = Observable
    .intervalRange(0, 
                   emittedList.size.toLong(), 
                   0, 
                   1, 
                   TimeUnit.SECONDS).map { index -> 
                     FoundNumber(emittedList[index.toInt()]) }

Эта функция будет вызываться для получения чисел, которые будут сравниваться со списком ожидаемых чисел.Неважно, есть ли дополнительные числа из scanForNumbers, которых нет в «целевом» списке.Они просто будут проигнорированы.Примерно так:

val expectedNumbers = listOf("202", "302","999")

        scanForNumbers(expectedNumbers)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe { value -> Log.d(TAG, "Was returned a $value") }

Итак, ожидаемые числа (202, 302 и 999) не совсем совпадают с числами, которые будут выбрасываться (202, 302 и 400).Таким образом, время ожидания ДОЛЖНО произойти, но со встроенной версией Observable.timer () оно не будет превышено по времени, так как наблюдался хотя бы один элемент.

Вот то, что я хотел бы иметь,Кто-нибудь знает, как закодировать это в RxJava / RxKotlin?

fun scanForNumbers(targets: List<String>): Observable<FoundNumber> {
  val accumulator: Pair<Set<Any>, FoundNumber?> = targets.toSet() to null
    return scanNumbers()
        .SPECIAL_TIMEOUT_FOR_LIST(5, TimeUnit.SECONDS, List)
        .scan(accumulator) { acc, next ->
            val (set, previous) = acc
            val stringSet:MutableSet<String> = hashSetOf()

            set.forEach { stringSet.add(it.toString()) }

            val item = if (next.numberId in stringSet) {
                next
            } else null
            (set - next) to item       // return set and nullable item
        }
        .filter { Log.d(TAG, "Filtering on ${it.second}")
                  it.second != null }  // item not null
        .take(targets.size.toLong())         // limit to the number of items
        .map { it.second }                   // unwrap the item from the pair
        .map { FoundController(it.numberId) }  // wrap in your class
}

Как вы кодируете, надеюсь, используя RxJava / Kotlin, средство для тайм-аута в списке, как упомянуто?

1 Ответ

0 голосов
/ 07 июня 2018

Я думаю, я понял, вы хотите, чтобы тайм-аут начал отсчитываться с момента подписки, а не после того, как вы наблюдаете за элементами.

Если это то, что вам нужно, то оператор takeUntil может помочьВы:

return scanNumbers()
    .takeUntil(Observable.timer(5, TimeUnit.SECONDS))
    .scan(accumulator) { acc, next -> ...

В этом случае таймер начнет отсчет, как только вы подпишетесь.Если основная наблюдаемая завершается раньше, тогда отлично, если нет, тогда таймер завершит основную наблюдаемую в любом случае.

Но takeUntil сам по себе не выдаст ошибку, он просто завершится.Если вам необходимо завершить с ошибкой, вы можете использовать следующую комбинацию:

return scanNumbers()
    .takeUntil(
            Observable
                    .error<Void>(new TimeoutError("timeout!"))
                    .delay(5, TimeUnit.SECONDS, true))
    .scan(accumulator) { acc, next -> ...
...