В моем приложении для Android у меня есть служба, у которой есть экземпляр класса (назовите его MQTTClient), который публикует или подписывается на сервер MQTT.Я хочу использовать RxJava с Eclipse Paho Android для управления операциями подписки и публикации MQTT.
Я использую Single
наблюдаемый и SingleObserver
для публикации, Flowable
наблюдаемый и Observer
для подписки.Но я застрял в точке, где я не могу понять, когда и как избавиться от Disposable
.
. Вот Single
Наблюдаемый из publish
метода в MQTTClient
Single<IMqttToken> pubTokenSingle = Single.create(new SingleOnSubscribe<IMqttToken>() {
@Override
public void subscribe(final SingleEmitter<IMqttToken> emitter) throws Exception {
final IMqttToken token = client.publish(topic, mqttMessage);
token.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
emitter.onSuccess(token);
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
boolean hasNetwork = isOnline(context);
if (hasNetwork && Objects.equals(((MqttException) exception).getReasonCode(),
MqttException.REASON_CODE_CLIENT_NOT_CONNECTED)) {
//connect client and retry MQTT pub
try {
//connect() is a method in MQTTClient
//connect() method also utilizes RxJava2 Single.
//Same issue of disposing a `Disposable` exists in that method as well
connect();
//call the publish method again
} catch (MqttException e) {
e.printStackTrace();
emitter.onError(e);
}
} else if (!hasNetwork) {
emitter.onError(exception);
} else {
emitter.onError(exception);
}
}
});
}
});
Вот SingleObserver
final Disposable[] disposable = new Disposable[1];
SingleObserver<IMqttToken> pubTokenSingleObserver = new SingleObserver<IMqttToken>() {
@Override
public void onSubscribe(Disposable d) {
disposable[0] = d;
}
@Override
public void onSuccess(IMqttToken iMqttToken) {
//disposable[0].dispose();
//Planning to use the above as last resort
//Also thought of moving this to doOnSuccess
}
@Override
public void onError(Throwable e) {
//Put topic name, and mqtt message in SQLite
//disposable[0].dispose();
//Planning to use the above as last resort
//Also thought of moving this to doOnError
}
};
Кто-то предположил, что у меня есть метод очистки в соответствующем классе, который вызывается при вызове onStop
.
Я обеспокоен тем, что произойдет, если я использую disposable.dispose()
, и работа сети все еще продолжается.
Как мне убедиться, что если операция не завершена, то по крайней мередетали сохраняются в БД SQLite? Я надеюсь, что решение будет легко расширяемым и для подписки.Если нет, то расскажите мне о возможных подводных камнях.
Это учебный проект, в котором я изучаю RxJava2, поэтому я не выбрал RxMQTT.