setupNotification возвращает «Ошибка уже подключена», тогда как запрос на соединение не отправляется
На самом деле выполняется два запроса на соединение - отсюда и ошибка.Из RxBleDevice.establishConnection()
Javadoc:
* Establishes connection with a given BLE device. {@link RxBleConnection} is a handle, used to process BLE operations with a connected
* device.
В вашем коде есть две подписки на establishConnection()
Observable
.
private lateinit var device: RxBleDevice
private var connectionObservable: Observable<RxBleConnection>? = null
private var rxBleConnection: RxBleConnection? = null
private val connectionDisposable = CompositeDisposable()
private val connectionStateDisposable = CompositeDisposable()
private var notifyValueChangeSubscription = CompositeDisposable()
var enableBatteryNotificationRunnable: Runnable = Runnable {
enableBatteryNotification()
}
private var myHandler = Handler()
val DELAY_BEFORE_ENABLE_NOTIFICATION: Long = 100
private fun connect() {
connectionObservable = device.establishConnection(false)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
connectionObservable?.let {
connectionDisposable.add(it.subscribe( // << Here is the first subscription
{ rxBleConnection ->
this.rxBleConnection = rxBleConnection
},
{ _ ->
Log.e("connect", "connexion error")
})
)
}
val state = device.observeConnectionStateChanges().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
connectionStateDisposable.add(
state.subscribe(
{ connectionState ->
Log.i("connect", "connexion state :$connectionState")
if(connectionState == RxBleConnection.RxBleConnectionState.CONNECTED) {
myHandler.postDelayed(enableBatteryNotificationRunnable, DELAY_BEFORE_ENABLE_NOTIFICATION);
}
}
)
{ _ ->
Log.e("connection listener", "connexion state error")
}
)
}
private fun enableBatteryNotification () {
connectionObservable?.let {
var observableToReturn = it
.flatMap { it.setupNotification(UUID_BATTERY_LEVEL) }
.doOnNext {
Log.i("NOTIFICATION", "doOnNext")
}
.flatMap { it }
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
notifyValueChangeSubscription.add(observableToReturn.subscribe({ bytes -> // << Here is the second subscription
var strBytes = String(bytes)
Log.i("NOTIFICATION", "value change: $strBytes")
},
{ throwable ->
Log.e("NOTIFICATION", "Error in notification process: " + throwable.message)
})
)
}
}
Эта ситуацияраспространенный источник путаницы для людей, изучающих RxJava
.Есть три пути , чтобы исправить вашу ситуацию.От минимального до большого объема работы:
Поделиться establishConnection
Observable
Можно поделиться одним RxBleConnection
с RxReplayingShare
.Измените это:
connectionObservable = device.establishConnection(false)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
На это:
connectionObservable = device.establishConnection(false)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.compose(ReplayingShare.instance())
Используйте свойство rxBleConnection: RxBleConnection?
Вместо:
connectionObservable?.let {
var observableToReturn = it
.flatMap { it.setupNotification(UUID_BATTERY_LEVEL) }
.doOnNext {
Log.i("NOTIFICATION", "doOnNext")
}
.flatMap { it }
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
notifyValueChangeSubscription.add(observableToReturn.subscribe({ bytes -> // << Here is the second subscription
var strBytes = String(bytes)
Log.i("NOTIFICATION", "value change: $strBytes")
},
{ throwable ->
Log.e("NOTIFICATION", "Error in notification process: " + throwable.message)
})
)
}
Сделайте это:
rxBleConnection?.let {
var observableToReturn = rxBleConnection.setupNotification(UUID_BATTERY_LEVEL)
.doOnNext {
Log.i("NOTIFICATION", "doOnNext")
}
.flatMap { it }
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
notifyValueChangeSubscription.add(observableToReturn.subscribe({ bytes -> // << Here is the second subscription
var strBytes = String(bytes)
Log.i("NOTIFICATION", "value change: $strBytes")
},
{ throwable ->
Log.e("NOTIFICATION", "Error in notification process: " + throwable.message)
})
)
}
Это не рекомендуется, так как вы можете получить RxBleConnection
, который больше не действителен, поскольку, возможно, он был отключен перед вызовом enableBatteryNotification()
Изменить поток вашегокод для использования одного .subscribe()
Это индивидуальное решение с учетом вашего конкретного варианта использования.К сожалению, с добавленной вами информацией недостаточно для создания заменяющего кода, но это может выглядеть примерно так:
device.establishConnection(false)
.flatMap { connection ->
Observable.merge(
connection.readCharacteristic(uuid0).map { ReadResult(uuid0, it) }.toObservable(),
connection.setupNotification(uuid1).flatMap { it }.map { NotifyResult(uuid1, it) }.delaySubscription(100, TimeUnit.MILLISECONDS)
)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ /* handle ReadResult/NotifyResult */ },
{ /* handle potential errors */ }
)
Где ReadResult
и NotifyResult
будут data class
, что займетUUID
и ByteArray