RxJava2 / RxAndroidBle: подписаться на Observable от побочных эффектов - PullRequest
0 голосов
/ 20 ноября 2018

У меня есть следующий вариант использования простого процесса установки устройства BLE с использованием RxAndroidBle:

  1. Подключение к устройству BLE.
  2. Начать прослушивание характеристики уведомления и настроить синтаксический анализатор для анализа каждого входящего уведомления.Затем Parser будет использовать PublishSubject для публикации проанализированных данных.
  3. Выполнение характеристики записи для записи (согласование безопасного соединения).
  4. Подождите, пока синтаксический анализатор PublishSubject доставит проанализированный ответ от устройства- открытый ключ (который поступил через характеристику уведомления в ответ на нашу запись).
  5. Выполнить еще одну запись в характеристику записи (установить соединение как безопасное).
  6. Доставить Completable высказываниеесли процесс завершился успешно или нет.

Сейчас мое решение (не работает) выглядит так:

deviceService.connectToDevice(macAddress)
    .andThen(Completable.defer { deviceService.setupCharacteristicNotification() })
    .andThen(Completable.defer { deviceService.postNegotiateSecurity() })
    .andThen(Completable.defer {
        parser.notificationResultSubject
            .flatMapCompletable { result ->
                when (result) {
                    DevicePublicKeyReceived -> Completable.complete()
                    else -> Completable.error(Exception("Unexpected notification parse result: ${result::class}"))
                }
            }
    })
    .andThen(Completable.defer { deviceService.postSetSecurity() })

И класс DeviceService:

class DeviceService {

    /**
     * Observable keeping shared RxBleConnection for reuse by different calls
     */
    private var connectionObservable: Observable<RxBleConnection>? = null

    fun connectToDevice(macAddress: String): Completable {
        return Completable.fromAction {
            connectionObservable = 
                rxBleClient.getBleDevice(macAddress)
                .establishConnection(false) 
                .compose(ReplayingShare.instance())
        }
    }

   fun setupCharacteristicNotification(): Completable =
        connectionObservable?.let {
            it
                .switchMap { connection ->
                    connection.setupNotification(UUID_NOTIFICATION_CHARACTERISTIC)
                        .map { notificationObservable -> notificationObservable.doOnNext { bytes -> parser.parse(bytes) }.ignoreElements() }
                        .map { channel ->
                            Observable.merge(
                                Observable.never<RxBleConnection>().startWith(connection),
                                channel.toObservable()
                            )
                        }
                        .ignoreElements()
                        .toObservable<RxBleConnection>()
                }
                .doOnError { Timber.e(it, "setup characteristic") }
                .take(1).ignoreElements()
        } ?: Completable.error(CONNECTION_NOT_INITIALIZED)

   fun postNegotiateSecurity(): Completable {
        val postLength = negotiateSecurity.postNegotiateSecurityLength()
        val postPGK = negotiateSecurity.postNegotiateSecurityPGKData()

        return connectionObservable?.let {
            it.take(1)
                .flatMapCompletable { connection ->
                    postLength
                        .flatMapSingle { connection.write(it.bytes.toByteArray()) }
                        .doOnError { Timber.e(it, "post length") }
                        .flatMap {
                            postPGK
                                .flatMapSingle { connection.write(it.bytes.toByteArray()) }
                                .doOnError { Timber.e(it, "post PGK") }
                        }
                        .take(1).ignoreElements()
                }
        } ?: Completable.error(CONNECTION_NOT_INITIALIZED)
    }

    fun postSetSecurity(): Completable =
        connectionObservable?.let {
            it.take(1)
                .flatMapCompletable { connection ->
                    negotiateSecurity.postSetSecurity()
                        .flatMapSingle { connection.write(it.bytes.toByteArray()) }
                        .take(1).ignoreElements()
                }
        } ?: Completable.error(CONNECTION_NOT_INITIALIZED)
   }

private fun RxBleConnection.write(bytes: ByteArray): Single<ByteArray> =
    writeCharacteristic(UUID_WRITE_CHARACTERISTIC, bytes)

Проблема в том, что он застревает в deviceService.postNegotiateSecurity() и никогда не проходит.Я также не получаю никаких данных в парсере, поэтому я предполагаю, что я неправильно подписываюсь на характеристику уведомления.

negotiateSecurity.postNegotiateSecurityLength() и negotiateSecurity.postNegotiateSecurityPGKData() - это методы, которые подготавливают данные для отправки и доставляют их.как Observable<SendFragment>.Из-за ограничения размера кадра данных один кадр может быть закодирован как несколько фрагментов, которые затем испускаются этими Observable s.

1 Ответ

0 голосов
/ 03 января 2019

Резюме:

  • postNegotiateSecurity() никогда не завершается
  • negotiateSecurity.postNegotiateSecurityLength() может излучать один или несколько раз
  • negotiateSecurity.postNegotiateSecurityPGKData() может излучать один или несколько раз

Анализ (пропущены журналы для удобства чтения):

it.take(1)
    .flatMapCompletable { connection ->
        postLength
            .flatMapSingle { connection.write(it.bytes.toByteArray()) }
            .flatMap {
                postPGK // may emit more than one value
                    .flatMapSingle { connection.write(it.bytes.toByteArray()) }
            }
            .take(1) // first emission from the above `flatMap` will finish the upstream
            .ignoreElements()
    }

Каждый выброс с postLength будет начинать характеристическую запись.Каждая успешная запись запускает подписку на postPGK.Если postLength будет излучаться более одного раза - будет сделано больше подписок на postPGK.

Каждая подписка на postPGK, скорее всего, приведет к множественным выбросам.Каждая эмиссия будет затем отображена на характеристическую запись.Каждая запись, успешная запись, будет выдавать значение.

После первого выброса из вышеупомянутой записи записи будет удален восходящий поток (из-за оператора .take(1)).

Если postNegotiateSecurity()фактически запущен, он также завершит работу или выдаст ошибку (учитывая, что и postLength, и postPGK будут излучать хотя бы одно значение), поскольку здесь нет дополнительной логики.

Заключение

postNegotiateSecurity() скорее всего завершится (но не по назначению), поскольку первый пакет из postPGK завершит его.Я бы предположил, что периферийное устройство ожидает полных данных, прежде чем оно будет уведомлять что-либо, поэтому оно ожидает полной передачи PGK, что не произойдет, как показано выше.

Журналы из приложения с установленным RxBleLog.setLogLevel(RxBleLog.VERBOSE) могутпомочь с пониманием того, что на самом деле произошло.

...