У меня есть поток событий, которые содержат время (Instant
). Я хочу создать чанки, начиная с первого раза заданного размера (Duration
). Я также хочу выдавать пустые порции, если ничего не поступало в течение установленного времени.
Пример:
Ввод: [t0, t1, t2, t3, t7, t8, t9, t10]
Вывод с размером окна 2: [[t0, t1], [t2, t3], [], [t7], [t8, t9], [t10]]
Проблема:
Я попытался создать вспомогательное средство публикации окон, которое будет генерировать моменты, в которые должно быть создано новое окно. Я поделился тем, что у меня есть ниже, для справки, но он не работает.
Код:
/**
* Creates a [Flowable] of chunks of time. It may return empty chunks
* if nothing happened during the set period.
*
* ie: [1, 2, 3, 4] -> [[1, 2], [3, 4]]
*/
fun <T : TimeAware> Flowable<T>.timeChunks(size: Duration, startAt: Instant? = null): TimeChunksFlowable<T> {
return TimeChunksFlowable(this, size, startAt)
}
class TimeChunksFlowable<T : TimeAware> (
private val source: Flowable<T>,
val size: Duration,
private var startAt: Instant? = null
) : Flowable<TimeBracketFlowable<T>>() {
init {
require(!size.isNegative && !size.isZero) { "Size must be positive: $size" }
}
override fun subscribeActual(s: Subscriber<in TimeBracketFlowable<T>>) {
source
.publish { it.window() }
.subscribe(s)
}
private fun Flowable<T>.window(): Flowable<TimeBracketFlowable<T>> {
val chunks = chunks()
return window(chunks).brackets(chunks)
}
private fun Flowable<Flowable<T>>.brackets(starts: Flowable<Instant>): Flowable<TimeBracketFlowable<T>> {
val bi = BiFunction { f: Flowable<T>, i: Instant -> f.timeBracket(i, i + size) }
return zipWith(starts, bi)
}
/**
* Creates a helper [Flowable] which emits an event every time we need to create a new chunk.
*/
private fun Flowable<T>.chunks(): ChunkStartsFlowable<T> {
return ChunkStartsFlowable(this, size, startAt)
}
private class ChunkStartsFlowable<T : TimeAware> (
private val source: Flowable<T>,
private val size: Duration,
private val startAt: Instant?
) : Flowable<Instant>() {
override fun subscribeActual(s: Subscriber<in Instant>) {
val subscriber = InternalSubscriber<T>(size, startAt)
source.subscribe(subscriber)
subscriber.publisher.subscribe(s)
}
private class InternalSubscriber<T : TimeAware> (
private val size: Duration,
private var startAt: Instant?
) : Subscriber<T> {
val publisher = UnicastProcessor.create<Instant>()
override fun onSubscribe(s: Subscription) {
publisher.onSubscribe(s)
}
override fun onNext(t: T) {
var start = startAt ?: t.time
// add empty chunks since nothing happened
while (start + size <= t.time) {
publisher.onNext(start)
start += size
}
startAt = start
}
override fun onError(t: Throwable) {
publisher.onError(t)
}
override fun onComplete() {
startAt?.apply(publisher::onNext)
publisher.onComplete()
}
}
}
}