Как правильно реализовать обработку кадров в режиме реального времени с помощью RxJava? - PullRequest
0 голосов
/ 06 ноября 2018

Я делаю реактивную обертку над камерой2, моя цель - получить каждый кадр, а затем перейти к распознаванию лиц.

Итак, я создал метод-оболочку для setOnImageAvailableListener

  fun createOnImageAvailableFlowable(imageReader: ImageReader, handler: Handler): Flowable<ImageReader> {
        return Flowable.create({ subscriber ->
            imageReader.setOnImageAvailableListener({
                if (!subscriber.isCancelled)
                    subscriber.onNext(it)
            }, handler)

            subscriber.setCancellable {
                imageReader.setOnImageAvailableListener(null, null)
            }
        }, BackpressureStrategy.LATEST)
    }

Реактивная цепь выглядит следующим образом:

 createOnImageAvailableFlowable(imageReader!!, null)
 .concatMap {
     it.acquireLatestImage()?.use { image ->
        val rotation = ReactiveCamera.getRotationCompensation(cameraId!!, this, applicationContext)
        val visionImage = FirebaseVisionImage.fromMediaImage(image, rotation)
        firebaseFaceDetector
          .detectInImage(visionImage)
          .toFlowable(BackpressureStrategy.LATEST)
          .map { list ->Optional(list)}
     } ?: Flowable.just(Optional(null))
 }
 ...

Этот код работает, но вызывает некоторые задержки на поверхности предварительного просмотра, потому что все работы выполняются в основном потоке. Это должно быть выполнено в отдельном потоке. Мое наивное решение заключается в добавлении оператора наблюдений перед concatMap:

createOnImageAvailableFlowable(imageReader!!, null)
.observeOn(Schedulers.io()) // doesn't switch thread
.concatMap {
 // still main thread
}
...

Но это не влияет, все работает в основном потоке. Если я укажу concatMapEager вместо concatMap, все будет работать как положено в отдельном потоке, но кадры идут со значительной задержкой.

Что я делаю не так? Как я могу дать команду реактивному потоку быть выполненным в отдельном потоке в этом случае? Как можно обрабатывать противодавление в случае обработки кадров в реальном времени?

Upd

Я предоставил свой собственный поток, как предложено Kiskae , но теперь только первый выброс происходит в потоке планировщика, но остальные выбросы остаются в основном потоке:

createOnImageAvailableFlowable(imageReader!!, null)
.subscribeOn(AndroidSchedulers.from(nonMainThread.looper))
.concatMap {
   val t = Thread.currentThread()
   val name = t.name
   Log.d(TAG, "current thread {$name}")
 ...
}

Вывод:

D/MainActivity: current thread {Camera2}
D/MainActivity: current thread {main}
D/MainActivity: current thread {main}
D/MainActivity: current thread {main}
D/MainActivity: current thread {main}

1 Ответ

0 голосов
/ 06 ноября 2018

Глядя на документацию ImageReader.setOnImageAvailableListener:

Обработчик: обработчик, для которого должен быть вызван слушатель, или ноль, если слушатель должен быть вызван в петлителе вызывающего потока.

Поскольку вы подписываетесь на главный петлитель, он заканчивает настройкой обратного вызова с использованием основного петлителя, поэтому все операции до concatMap всегда выполняются в потоке приложения.

Вы можете решить эту проблему, либо предоставив обработчик вместо null, либо вызвав subscribeOn и предоставив планировщик на основе обработчика, такой как RxAndroid's HandlerScheduler.

...