Rx (RxKotlin) - timedBuffer - буфер с отметкой времени начала окна - PullRequest
0 голосов
/ 28 апреля 2019

Я хочу получить rx buffer, который сообщает мне время (начало!) Каждого окна, например:

someObservable<T>
    .timedBuffer(1000, TimeUnit.MILLISECONDS)
    .subscribe { it: Timed<List<T>> ->
        //it.time() is the timestamp of when the relevant interval started
    }  

Твердая часть:
Дело в том, что он очень похож на метод buffer, но он не дает вам отметку времени начала буфера. Это единственное, чего не хватает. Без этого мне в основном нужно реализовать буферизацию самостоятельно.
И вот тут все усложняется - Мне не удалось реализовать буферизацию на наблюдаемой «слева» / «этой», только на интервале один.
Это означает, что использование моего timedBuffer практически возвращает «другое» / «правильное» Observable (например, то, которое вы передадите обычному buffer). И это проблема, как интуитивно, так и практически (вам нужно разобраться с завершением, запуском и ошибками самостоятельно!)

Что я получил до сих пор: (с проблемами, написанными выше)

private fun <L, R> Observable<L>.leftGroupJoin(right: Observable<R>): Observable<Pair<List<L>, R>> {
    var thisIsCompleted = false

    //TODO THINK manipulate and return the 'left'/this - auto handling of start, complete and errors

    //we manipulate and return the *right* observable, so we bind the completion of 'left' to enforce the completion of 'right'
    this.doOnComplete { thisIsCompleted = true }             .let { thisObs ->
    right.takeWhile { !thisIsCompleted }.share()             .let { rightObs ->

    return rightObs
        .flatMapSingle { r ->               //treat each 'right' as a Single
            thisObs.buffer(rightObs)                //buffer 'left' on each 'right' Single (until 'right' onNext())
            .map { Pair(it, r) }                    //pair buffered 'left' with one 'right'
            .first(Pair(emptyList(), r))            //should be exactly one 'right' (Single).
        }
        .skipWhile { it.first.isEmpty() }   //bind start to 'left' - wait for the first buffer with at least one 'left'
    }}
}

private fun <T> Observable<T>.timedBuffer(timeout: Long, timeUnit: TimeUnit, scheduler: Scheduler): Observable<Timed<List<T>>> {
    //buffer 'this' stream on intervals
    return this
        .leftGroupJoin(Observable.interval(0, timeout, timeUnit, scheduler))
        .map { Timed(it.first, it.second, timeUnit) }
}  

Примечание:
Вообще говоря, в соответствии с моей реализацией, мне нужно найти хорошее решение для leftGroupJoin, которое буферизует элементы 'left' / 'this' в данном 'right' / 'other' Observable. Но я намеренно попросил timedBuffer, так как это то, что мне нужно. Поэтому любая реализация, которая дает мне timedBuffer, более чем достаточна, чтобы ответить на мой вопрос и потребности.
Thx!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...