RxJava буфер / окно с пользовательскими критериями подсчета - PullRequest
0 голосов
/ 13 октября 2018

У меня есть Observable, который испускает несколько объектов, и я хочу сгруппировать эти объекты, используя операции window или buffer.Однако вместо указания параметра count для определения того, сколько объектов должно быть в окне, я хочу использовать пользовательские критерии.

Например, предположим, что наблюдаемое излучает экземпляры класса Messageкак показано ниже.

class Message(
   val int size: Int
)

Я бы хотел буферизовать или оканчивать экземпляры сообщений на основе их переменной size, а не только их количества.Например, чтобы получить окна сообщений с общим размером не более 5000.

// Something like this
readMessages()
    .buffer({ message -> message.size }, 5000)

Есть ли простой способ сделать это?

1 Ответ

0 голосов
/ 13 октября 2018

Сначала я должен признаться, что я не эксперт по RxJava.Я только что нашел ваш вопрос сложным и попытался найти решение.

Существует функция window() с параметром boundaryIndicator.Вы должны создать Publisher / Flowable, который испускает элемент, если достигнут размер окна.

В примере я создал объект windowManager, который используется как boundaryIndicator.В обратном вызове onNext я вызываю windowManager и даю ему возможность открыть новое окно.

val windowManager = object {
    lateinit var emitter: FlowableEmitter<Unit>
    var windowSize: Long = 0

    fun createEmitter(emitter: FlowableEmitter<Unit>) {
        this.emitter = emitter
    }

    fun openWindowIfRequired(size: Long) {
        windowSize += size
        if (windowSize > 5) {
            windowSize = 0
            emitter.onNext(Unit)
        }
    }
}

val windowBoundary = Flowable.create<Unit>(windowManager::createEmitter, BackpressureStrategy.ERROR)

Flowable.interval(1, TimeUnit.SECONDS).window(windowBoundary).subscribe {
    it.doOnNext {
        windowManager.openWindowIfRequired(it)
    }.doOnSubscribe {
        println("Open window")
    }.doOnComplete {
        println("Close window")
    }.subscribe {
        println(it)
    }
}
...