Использование RxJava с Paho MQTT - PullRequest
0 голосов
/ 06 июня 2018

В моем приложении для Android у меня есть служба, у которой есть экземпляр класса (назовите его MQTTClient), который публикует или подписывается на сервер MQTT.Я хочу использовать RxJava с Eclipse Paho Android для управления операциями подписки и публикации MQTT.

Я использую Single наблюдаемый и SingleObserver для публикации, Flowable наблюдаемый и Observer для подписки.Но я застрял в точке, где я не могу понять, когда и как избавиться от Disposable.

. Вот Single Наблюдаемый из publish метода в MQTTClient

Single<IMqttToken> pubTokenSingle = Single.create(new SingleOnSubscribe<IMqttToken>() {
  @Override
  public void subscribe(final SingleEmitter<IMqttToken> emitter) throws Exception {
    final IMqttToken token = client.publish(topic, mqttMessage);
      token.setActionCallback(new IMqttActionListener() {
        @Override
        public void onSuccess(IMqttToken asyncActionToken) {
            emitter.onSuccess(token);
        }

        @Override
        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
            boolean hasNetwork = isOnline(context);
          if (hasNetwork && Objects.equals(((MqttException) exception).getReasonCode(),
              MqttException.REASON_CODE_CLIENT_NOT_CONNECTED)) {
            //connect client and retry MQTT pub
            try {
              //connect() is a method in MQTTClient
              //connect() method also utilizes RxJava2 Single.
              //Same issue of disposing a `Disposable` exists in that method as well 
              connect();
              //call the publish method again
            } catch (MqttException e) {
              e.printStackTrace();
              emitter.onError(e);
            }
          } else if (!hasNetwork) {
            emitter.onError(exception);
          } else {
            emitter.onError(exception);
          }
        }
    });
  }
});

Вот SingleObserver

final Disposable[] disposable = new Disposable[1];
SingleObserver<IMqttToken> pubTokenSingleObserver = new SingleObserver<IMqttToken>() {
  @Override
  public void onSubscribe(Disposable d) {
    disposable[0] = d;
  }

  @Override
  public void onSuccess(IMqttToken iMqttToken) {
    //disposable[0].dispose();
    //Planning to use the above as last resort
    //Also thought of moving this to doOnSuccess
  }

  @Override
  public void onError(Throwable e) {
    //Put topic name, and mqtt message in SQLite

    //disposable[0].dispose();
    //Planning to use the above as last resort
    //Also thought of moving this to doOnError
  }
};

Кто-то предположил, что у меня есть метод очистки в соответствующем классе, который вызывается при вызове onStop.

Я обеспокоен тем, что произойдет, если я использую disposable.dispose(), и работа сети все еще продолжается.

Как мне убедиться, что если операция не завершена, то по крайней мередетали сохраняются в БД SQLite? Я надеюсь, что решение будет легко расширяемым и для подписки.Если нет, то расскажите мне о возможных подводных камнях.

Это учебный проект, в котором я изучаю RxJava2, поэтому я не выбрал RxMQTT.

...