Принудительно использовать буфер Rx Java для flu sh по сигналу - PullRequest
0 голосов
/ 28 января 2020

Я пишу приложение Android с использованием Rx java, приложение просто собирает данные датчика и сохраняет их в файлах, когда буфер заполнен. Для этой цели я использую PublishProcessor, который выдает значение каждый раз, когда обнаруживается событие датчика.

У меня есть следующие классы помощников:

interface RxVariableInterface<T,R>{
    var value : T
    val observable : R
}

//Value is received on subscribing
class ProcessablePublishVariable<T> (defaultValue: T) : RxVariableInterface<T,PublishProcessor<T>>{
    override var value: T = defaultValue
        set(value) {
            field = value
            observable.onNext(value)
        }
    override val observable = PublishProcessor.create<T>()
}

...

var imuProcessablePublishVariable  : ProcessablePublishVariable<SensorProto6> = ProcessablePublishVariable(SensorProto6( ... ))

Когда происходит событие датчика, я просто делаю следующее:

imuProcessablePublishVariable.value = SensorProto6(...)

На стороне слушателя я имею создал Observer, который выполняет упаковку данных в текстовые файлы и подписывается оператором буфера:

class MySubscriber<List<SensorProto6>> : Subscriber<List<SensorProto6>> {

    var subscription : Subscription? = null

    override fun onError(e: Throwable) {
        Log.e("RxJavaHAndlerProcessor","${e.stackTrace}")
    }
    override fun onSubscribe(s: Subscription) {
        subscription = s
        subscription!!.request(1)
    }
    override fun onComplete() { ... }

    override fun onNext(t: List<SensorProto6>) {
         // Post the buffer list to the writer thread
         mWorkerHandler?.post(WriterRunnable(t, mContext,SensorProtos.SensorHeader.SensorType.IMU))
         subscription!!.request(1)
    }

}
...

imuProcessablePublishVariable.observable
     .subscribeOn(Schedulers.io())
     .buffer(500)
     .observeOn(Schedulers.io())
     .subscribe(mySubscriber)

Все работает как положено, я получаю списки показаний датчика, содержащие 500 элементов. Есть ли способ сделать flu sh буфер и создать частичный список?

Например, пользователь останавливает приложение, когда буфер заполнен на 70%, я хотел бы получить список ожидания, не дожидаясь заполнения буфера. Есть ли другой способ реализовать эту функциональность?

...