Текущее окно на основе свойства - PullRequest
0 голосов
/ 10 января 2020

У меня есть поток событий, которые содержат время (Instant). Я хочу создать чанки, начиная с первого раза заданного размера (Duration). Я также хочу выдавать пустые порции, если ничего не поступало в течение установленного времени.

Пример:

Ввод: [t0, t1, t2, t3, t7, t8, t9, t10]

Вывод с размером окна 2: [[t0, t1], [t2, t3], [], [t7], [t8, t9], [t10]]

Проблема:

Я попытался создать вспомогательное средство публикации окон, которое будет генерировать моменты, в которые должно быть создано новое окно. Я поделился тем, что у меня есть ниже, для справки, но он не работает.

Код:

/**
 * Creates a [Flowable] of chunks of time. It may return empty chunks
 * if nothing happened during the set period.
 *
 * ie: [1, 2, 3, 4] -> [[1, 2], [3, 4]]
 */
fun <T : TimeAware> Flowable<T>.timeChunks(size: Duration, startAt: Instant? = null): TimeChunksFlowable<T> {
    return TimeChunksFlowable(this, size, startAt)
}

class TimeChunksFlowable<T : TimeAware> (
    private val source: Flowable<T>,
    val size: Duration,
    private var startAt: Instant? = null
) : Flowable<TimeBracketFlowable<T>>() {

    init {
        require(!size.isNegative && !size.isZero) { "Size must be positive: $size" }
    }

    override fun subscribeActual(s: Subscriber<in TimeBracketFlowable<T>>) {
        source
            .publish { it.window() }
            .subscribe(s)
    }

    private fun Flowable<T>.window(): Flowable<TimeBracketFlowable<T>> {
        val chunks = chunks()
        return window(chunks).brackets(chunks)
    }

    private fun Flowable<Flowable<T>>.brackets(starts: Flowable<Instant>): Flowable<TimeBracketFlowable<T>> {
        val bi = BiFunction { f: Flowable<T>, i: Instant -> f.timeBracket(i, i + size) }
        return zipWith(starts, bi)
    }

    /**
     * Creates a helper [Flowable] which emits an event every time we need to create a new chunk.
     */
    private fun Flowable<T>.chunks(): ChunkStartsFlowable<T> {
        return ChunkStartsFlowable(this, size, startAt)
    }

    private class ChunkStartsFlowable<T : TimeAware> (
        private val source: Flowable<T>,
        private val size: Duration,
        private val startAt: Instant?
    ) : Flowable<Instant>() {

        override fun subscribeActual(s: Subscriber<in Instant>) {

            val subscriber = InternalSubscriber<T>(size, startAt)
            source.subscribe(subscriber)
            subscriber.publisher.subscribe(s)
        }

        private class InternalSubscriber<T : TimeAware> (
            private val size: Duration,
            private var startAt: Instant?
        ) : Subscriber<T> {

            val publisher = UnicastProcessor.create<Instant>()

            override fun onSubscribe(s: Subscription) {
                publisher.onSubscribe(s)
            }

            override fun onNext(t: T) {
                var start = startAt ?: t.time

                // add empty chunks since nothing happened
                while (start + size <= t.time) {
                    publisher.onNext(start)
                    start += size
                }

                startAt = start
            }

            override fun onError(t: Throwable) {
                publisher.onError(t)
            }

            override fun onComplete() {
                startAt?.apply(publisher::onNext)
                publisher.onComplete()
            }

        }

    }
}

1 Ответ

0 голосов
/ 11 января 2020

ОБНОВЛЕНИЕ: Подумав больше об этой проблеме, я обнаружил, что буфера должно быть достаточно:

upstream.buffer(
    windowSize.toMillis(),
    windowSize.toMillis(),
    TimeUnit.MILLISECONDS);
...