Я делаю реактивную обертку над камерой2, моя цель - получить каждый кадр, а затем перейти к распознаванию лиц.
Итак, я создал метод-оболочку для setOnImageAvailableListener
fun createOnImageAvailableFlowable(imageReader: ImageReader, handler: Handler): Flowable<ImageReader> {
return Flowable.create({ subscriber ->
imageReader.setOnImageAvailableListener({
if (!subscriber.isCancelled)
subscriber.onNext(it)
}, handler)
subscriber.setCancellable {
imageReader.setOnImageAvailableListener(null, null)
}
}, BackpressureStrategy.LATEST)
}
Реактивная цепь выглядит следующим образом:
createOnImageAvailableFlowable(imageReader!!, null)
.concatMap {
it.acquireLatestImage()?.use { image ->
val rotation = ReactiveCamera.getRotationCompensation(cameraId!!, this, applicationContext)
val visionImage = FirebaseVisionImage.fromMediaImage(image, rotation)
firebaseFaceDetector
.detectInImage(visionImage)
.toFlowable(BackpressureStrategy.LATEST)
.map { list ->Optional(list)}
} ?: Flowable.just(Optional(null))
}
...
Этот код работает, но вызывает некоторые задержки на поверхности предварительного просмотра, потому что все работы выполняются в основном потоке. Это должно быть выполнено в отдельном потоке. Мое наивное решение заключается в добавлении оператора наблюдений перед concatMap:
createOnImageAvailableFlowable(imageReader!!, null)
.observeOn(Schedulers.io()) // doesn't switch thread
.concatMap {
// still main thread
}
...
Но это не влияет, все работает в основном потоке. Если я укажу concatMapEager вместо concatMap, все будет работать как положено в отдельном потоке, но кадры идут со значительной задержкой.
Что я делаю не так? Как я могу дать команду реактивному потоку быть выполненным в отдельном потоке в этом случае? Как можно обрабатывать противодавление в случае обработки кадров в реальном времени?
Upd
Я предоставил свой собственный поток, как предложено Kiskae , но теперь только первый выброс происходит в потоке планировщика, но остальные выбросы остаются в основном потоке:
createOnImageAvailableFlowable(imageReader!!, null)
.subscribeOn(AndroidSchedulers.from(nonMainThread.looper))
.concatMap {
val t = Thread.currentThread()
val name = t.name
Log.d(TAG, "current thread {$name}")
...
}
Вывод:
D/MainActivity: current thread {Camera2}
D/MainActivity: current thread {main}
D/MainActivity: current thread {main}
D/MainActivity: current thread {main}
D/MainActivity: current thread {main}