Исключение HiveMQ сработало после режима полета - PullRequest
0 голосов
/ 18 сентября 2019

Я успешно реализовал HiveMQ в фоновом потоке, но есть небольшая проблема после включения режима полета.Журнал показывает причину, но я думаю, что чего-то не хватает, и я не вижу, где находится отсутствующий обработчик ошибок.

Журнал:

com.hivemq.client.mqtt.exceptions.MqttSessionExpiredException: Session expired as connection was closed.
System.err  W  io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call

Код реализации:

    client = Mqtt5Client.builder()
            .serverHost(host)
            .serverPort(port)
            .identifier(clientId)
            .addDisconnectedListener(new MqttClientDisconnectedListener() {
                @Override
                public void onDisconnected(MqttClientDisconnectedContext context) {
                    Log.d(TAG, "On disconnected... " + context.getCause());
                }
            })
            .automaticReconnectWithDefaultConfig()
            .buildRx();
    Mqtt5Connect connect = Mqtt5Connect.builder()
            .willPublish()
                .topic(willTopic)
            .applyWillPublish()
            .build();

    Completable connectScenario = client.connect(connect)
            .doOnSuccess(this::connectSuccess)
            .doOnError(this::connectFailed)
            .ignoreElement();

    Single<Mqtt5PublishResult> publishConnect
            = client.publish(Flowable.just(
                    Mqtt5Publish.builder()
                            .topic("d/" + this.clientId + START)
                            .payload(startData.toByteArray())
                            .build())).singleOrError();

            connectScenario
            .andThen(publishConnect)
            .doOnSuccess(this::onConnectSuccess)
            .doOnError(this::disconnectError)
            .subscribe();

Конечно, чего-то не хватает, но вопрос в том, где мне обрабатывать события отключения.

1 Ответ

4 голосов
/ 19 сентября 2019

RxJava не обрабатывает ошибки как обработанные, если вы добавляете обратный вызов doOnError.

Вы можете добавить свой обработчик ошибок к вызову subscribe:

connectScenario
    .andThen(publishConnect)
    .subscribe(this::onConnectSuccess, this::disconnectError);

Вместо этого вы можететакже игнорируйте ошибку после ее обработки в обратном вызове doOnError:

connectScenario
    .andThen(publishConnect)
    .doOnSuccess(this::onConnectSuccess)
    .doOnError(this::disconnectError)
    .ignoreElement().onErrorComplete()
    .subscribe();

Если вы хотите допустить временную недоступность сети, вы должны использовать sessionExpiryInterval> 0 и автоматическое переподключение.

...