Я хочу получить rx buffer
, который сообщает мне время (начало!) Каждого окна, например:
someObservable<T>
.timedBuffer(1000, TimeUnit.MILLISECONDS)
.subscribe { it: Timed<List<T>> ->
//it.time() is the timestamp of when the relevant interval started
}
Твердая часть:
Дело в том, что он очень похож на метод buffer
, но он не дает вам отметку времени начала буфера. Это единственное, чего не хватает. Без этого мне в основном нужно реализовать буферизацию самостоятельно.
И вот тут все усложняется - Мне не удалось реализовать буферизацию на наблюдаемой «слева» / «этой», только на интервале один.
Это означает, что использование моего timedBuffer
практически возвращает «другое» / «правильное» Observable
(например, то, которое вы передадите обычному buffer
). И это проблема, как интуитивно, так и практически (вам нужно разобраться с завершением, запуском и ошибками самостоятельно!)
Что я получил до сих пор: (с проблемами, написанными выше)
private fun <L, R> Observable<L>.leftGroupJoin(right: Observable<R>): Observable<Pair<List<L>, R>> {
var thisIsCompleted = false
//TODO THINK manipulate and return the 'left'/this - auto handling of start, complete and errors
//we manipulate and return the *right* observable, so we bind the completion of 'left' to enforce the completion of 'right'
this.doOnComplete { thisIsCompleted = true } .let { thisObs ->
right.takeWhile { !thisIsCompleted }.share() .let { rightObs ->
return rightObs
.flatMapSingle { r -> //treat each 'right' as a Single
thisObs.buffer(rightObs) //buffer 'left' on each 'right' Single (until 'right' onNext())
.map { Pair(it, r) } //pair buffered 'left' with one 'right'
.first(Pair(emptyList(), r)) //should be exactly one 'right' (Single).
}
.skipWhile { it.first.isEmpty() } //bind start to 'left' - wait for the first buffer with at least one 'left'
}}
}
private fun <T> Observable<T>.timedBuffer(timeout: Long, timeUnit: TimeUnit, scheduler: Scheduler): Observable<Timed<List<T>>> {
//buffer 'this' stream on intervals
return this
.leftGroupJoin(Observable.interval(0, timeout, timeUnit, scheduler))
.map { Timed(it.first, it.second, timeUnit) }
}
Примечание:
Вообще говоря, в соответствии с моей реализацией, мне нужно найти хорошее решение для leftGroupJoin
, которое буферизует элементы 'left' / 'this' в данном 'right' / 'other' Observable
. Но я намеренно попросил timedBuffer
, так как это то, что мне нужно. Поэтому любая реализация, которая дает мне timedBuffer
, более чем достаточна, чтобы ответить на мой вопрос и потребности.
Thx!