Повышение производительности при использовании потокового RxJava для обработки запросов Android Room - PullRequest
0 голосов
/ 08 ноября 2019

Мое приложение собирает значения датчика с акселерометра с максимально возможной частотой дискретизации (~ 200 Гц на моем устройстве) и сохраняет значения в базе данных Room. Я также хочу часто обновлять некоторые графики с последними измерениями, скажем, частотой обновления 5 раз в секунду. С тех пор, как приложение также собирает линейное ускорение (без g) также с частотой ~ 200 Гц (поэтому два датчика каждый с приблизительно 200 Гц, вставляя значения в базу данных), я заметил сильное снижение производительности приложений, и у меня задержка составляет несколько секундмежду собранными значениями ускорения и их отображением на графике. Исходя из профилировщика, я предполагаю, что RxComputationThread является узким местом, так как он активен почти все время из-за Flowables.

Я использую sample (), чтобы ограничить обновления получателя, так как мои графики не должны обновлять супердовольно часто. Это привело к приемлемой производительности, когда я просто собрал один датчик. Я видел, что RxJava предоставляет метод interval () для ограничения частоты излучения со стороны излучателя, но мне это кажется недоступным? (Неразрешенная ссылка).

Может быть, у кого-то есть идеи, как улучшить производительность? Мне нравятся концепции RxJava и Room в целом, и я хотел бы придерживаться их, но я довольно сильно застрял на этом этапе.

Вот код, который я использую для наблюдения таблицы Room SQL и обновления графиков:

// Observe changes to the datasource and create a new subscription if necessary
sharedViewModel.dataSource.observe(viewLifecycleOwner, Observer { source ->
    Log.d("TAG", "Change observed!")
    when (source) {
        "acc" -> {
            val disposableDataSource =
                sharedViewModel.lastSecondsAccelerations
                    .sample(200, TimeUnit.MILLISECONDS)
                    .onBackpressureDrop()
                    .subscribeOn(Schedulers.io())
                    .subscribe { lastMeasurements ->
                        Log.d("TAG", Thread.currentThread().name)
                        if (sharedViewModel.isReset.value == true && lastMeasurements.isNotEmpty()) {
                            val t =
                                lastMeasurements.map { (it.time.toDouble() * 1e-9) - (lastMeasurements.last().time.toDouble() * 1e-9) }
                            val accX = lastMeasurements.map { it.accX.toDouble() }
                            val accY = lastMeasurements.map { it.accY.toDouble() }
                            val accZ = lastMeasurements.map { it.accZ.toDouble() }

                            // Update plots
                            updatePlots(t, accX, accY, accZ)
                        }
                    }
            compositeDisposable.clear()
            compositeDisposable.add(disposableDataSource)
        }
        "lin_acc" -> {
            val disposableDataSource =
                sharedViewModel.lastSecondsLinAccelerations
                    .sample(200, TimeUnit.MILLISECONDS)
                    .onBackpressureDrop()
                    .subscribeOn(Schedulers.io())
                    .subscribe { lastMeasurements ->
                        Log.d("TAG", Thread.currentThread().name)
                        if (sharedViewModel.isReset.value == true && lastMeasurements.isNotEmpty()) {
                            val t =
                                lastMeasurements.map { (it.time.toDouble() * 1e-9) - (lastMeasurements.last().time.toDouble() * 1e-9) }
                            val accX = lastMeasurements.map { it.accX.toDouble() }
                            val accY = lastMeasurements.map { it.accY.toDouble() }
                            val accZ = lastMeasurements.map { it.accZ.toDouble() }

                            // Update plots
                            updatePlots(t, accX, accY, accZ)
                        }
                    }
            compositeDisposable.clear()
            compositeDisposable.add(disposableDataSource)
        }
    }
})

Запрос на получение последних 10 секунд измерений

@Query("SELECT * FROM acc_measurements_table WHERE time > ((SELECT MAX(time) from acc_measurements_table)- 1e10)")
fun getLastAccelerations(): Flowable<List<AccMeasurement>>

1 Ответ

0 голосов
/ 12 ноября 2019

Спасибо за ваши комментарии, теперь я понял, в чем заключалось узкое место. Проблема была в огромном количестве вставных звонков, что не слишком удивительно. Но можно улучшить производительность, используя некоторый буфер для одновременной вставки нескольких строк.

Это то, что я добавил, если кто-то работает в той же ситуации:

class InsertHelper(private val repository: Repository){
    var compositeDisposable = CompositeDisposable()

    private val measurementListAcc: FlowableList<AccMeasurement> = FlowableList()
    private val measurementListLinAcc: FlowableList<LinAccMeasurement> = FlowableList()

    fun insertAcc(measurement: AccMeasurement) {
        measurementListAcc.add(measurement)
    }
    fun insertLinAcc(measurement: LinAccMeasurement) {
        measurementListLinAcc.add(measurement)
    }

    init {
        val disposableAcc = measurementListAcc.subject
            .buffer(50)
            .subscribe {measurements ->
                GlobalScope.launch {
                    repository.insertAcc(measurements)
                }
                measurementListAcc.remove(measurements as ArrayList<AccMeasurement>)
            }

        val disposableLinAcc = measurementListLinAcc.subject
            .buffer(50)
            .subscribe {measurements ->
                GlobalScope.launch {
                    repository.insertLinAcc(measurements)
                }
                measurementListLinAcc.remove(measurements as ArrayList<LinAccMeasurement>)
            }

        compositeDisposable.add(disposableAcc)
        compositeDisposable.add(disposableLinAcc)
    }
}
// Dynamic list that can be subscribed on
class FlowableList<T> {
    private val list: MutableList<T> = ArrayList()
    val subject = PublishSubject.create<T>()

    fun add(value: T) {
        list.add(value)
        subject.onNext(value)
    }

    fun remove(value: ArrayList<T>) {
        list.removeAll(value)
    }
}

Я в основном использую динамический список для буферизации нескольких десятков образцов измерений, затем вставляю их целиком в базу данных комнат и удаляю их из динамического списка. Вот также некоторая информация, почему пакетная вставка выполняется быстрее: https://hackernoon.com/squeezing-performance-from-sqlite-insertions-with-room-d769512f8330

Я все еще новичок в разработке Android, поэтому, если вы видите некоторые ошибки или у вас есть предложения, я ценю каждый комментарий:)

...