QoS = 1 с сообщениями о пропущенных подписках MqttAsyncClient - PullRequest
0 голосов
/ 25 февраля 2020

Служба переднего плана выполняет роль клиента MQTT. Я использую MqttAsyncClient mqttClient для этой цели.
Я использую QoS=1 для подписки на topi c:

mqttClient.subscribe("sensors/s1/", 1);

Но в случае, если мой телефон отключается на некоторое время он пропускает сообщения текущего периода. Весь код приведен ниже.

Я использую другое приложение, которым пользуюсь MqttAndroidClient mqttAndroidClient, и в этом случае QoS = 1 возвращает все пропущенные сообщения.

mqttAndroidClient.subscribe(topic, 1, null, new IMqttActionListener() {...})

Зачем нужна подписка с MqttAsyncClient с QoS = 1 не получает все сообщения?

Весь код:

public class MqttGndService extends Service {

    private String ip="ssl:myserver",port="8887";
    private final IBinder mBinder = new LocalBinder();
    private Handler mHandler;


    private static final String TAG = "mqttservice";
    private static boolean hasWifi = false;
    private static boolean hasMmobile = false;
    private ConnectivityManager mConnMan;
    private volatile IMqttAsyncClient mqttClient;
    private String uniqueID;


    class MQTTBroadcastReceiver extends BroadcastReceiver {
        @Override
        public void onReceive(Context context, Intent intent) {

            IMqttToken token;
            boolean hasConnectivity = false;
            boolean hasChanged = false;
            NetworkInfo infos[] = mConnMan.getAllNetworkInfo();
            for (int i = 0; i < infos.length; i++) {
                if (infos[i].getTypeName().equalsIgnoreCase("MOBILE")) {
                    if ((infos[i].isConnected() != hasMmobile)) {
                        hasChanged = true;
                        hasMmobile = infos[i].isConnected();
                    }
                    Timber.tag(Utils.TIMBER_TAG).v( infos[i].getTypeName() + " is " + infos[i].isConnected());
                } else if (infos[i].getTypeName().equalsIgnoreCase("WIFI")) {
                    if ((infos[i].isConnected() != hasWifi)) {
                        hasChanged = true;
                        hasWifi = infos[i].isConnected();
                    }
                    Timber.tag(Utils.TIMBER_TAG).v(infos[i].getTypeName() + " is " + infos[i].isConnected());
                }
            }
            hasConnectivity = hasMmobile || hasWifi;
            Timber.tag(Utils.TIMBER_TAG).v( "hasConn: " + hasConnectivity + " hasChange: " + hasChanged + " - " + (mqttClient == null || !mqttClient.isConnected()));
            if (hasConnectivity && hasChanged && (mqttClient == null || !mqttClient.isConnected())) {
                Timber.tag(Utils.TIMBER_TAG).v("Ready to connect");
                doConnect();
                Timber.tag(Utils.TIMBER_TAG).v("do connect done");

            } else
            {
                Timber.tag(Utils.TIMBER_TAG).v("Connection not possible");
            }



        }
    }


    public class LocalBinder extends Binder {
        public MqttGndService getService() {
            // Return this instance of LocalService so clients can call public methods
            return MqttGndService.this;
        }
    }

    @Override
    public IBinder onBind(Intent intent) {
        return mBinder;
    }

    public void publish(String topic, MqttMessage message) {
        SharedPreferences sharedPref = PreferenceManager.getDefaultSharedPreferences(this);// we create a 'shared" memory where we will share our preferences for the limits and the values that we get from onsensorchanged
        try {

            mqttClient.publish(topic, message);

        } catch (MqttException e) {
            e.printStackTrace();
        }

    }


    @Override
    public void onCreate() {
        Timber.tag(Utils.TIMBER_TAG).v("Creating MQTT service");
        mHandler = new Handler();//for toasts
        IntentFilter intentf = new IntentFilter();
        setClientID();
        intentf.addAction(ConnectivityManager.CONNECTIVITY_ACTION);
        registerReceiver(new MQTTBroadcastReceiver(), new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
        mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
    }

    @Override
    public void onConfigurationChanged(Configuration newConfig) {
        Timber.tag(Utils.TIMBER_TAG).v( "onConfigurationChanged()");
        android.os.Debug.waitForDebugger();
        super.onConfigurationChanged(newConfig);

    }

    @Override
    public void onDestroy() {
        super.onDestroy();
        Timber.tag(Utils.TIMBER_TAG).v("Service onDestroy");

    }


    private void setClientID() {
        uniqueID = android.provider.Settings.Secure.getString(getContentResolver(), android.provider.Settings.Secure.ANDROID_ID);
        Timber.tag(Utils.TIMBER_TAG).v("uniqueID=" + uniqueID);

    }


    private void doConnect() {
        String broker = ip + ":" + port;
        Timber.tag(Utils.TIMBER_TAG).v("mqtt_doConnect()");
        IMqttToken token;
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setMaxInflight(100);//handle more messages!!so as not to disconnect
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(1000);
        options.setKeepAliveInterval(300);
        options.setUserName("cc50e3e91bf4");
        options.setPassword("b".toCharArray());

        try {
            options.setSocketFactory(SocketFactoryMQ.getSocketFactory(this,""));
        } catch (KeyStoreException e) {
            e.printStackTrace();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (KeyManagementException e) {
            e.printStackTrace();
        } catch (CertificateException e) {
            e.printStackTrace();
        } catch (UnrecoverableKeyException e) {
            e.printStackTrace();
        }

        Timber.tag(Utils.TIMBER_TAG).v("set socket factory done");
        try {


            mqttClient = new MqttAsyncClient(broker, uniqueID, new MemoryPersistence());
            token = mqttClient.connect(options);
            token.waitForCompletion(3500);

            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable throwable) {
                    try {
                        mqttClient.disconnectForcibly();
                        mqttClient.connect();
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }

                @Override
                public void messageArrived(String topic, MqttMessage msg) throws Exception {
                    Timber.tag(Utils.TIMBER_TAG).v("Message arrived from topic " + topic+ "  msg: " + msg );



                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("published");
                }
            });
            Timber.tag(Utils.TIMBER_TAG).v("will subscribe");
            mqttClient.subscribe("sensors/s1/", 1);



        } catch (MqttSecurityException e) {
            Timber.tag(Utils.TIMBER_TAG).v("general connect exception");
            e.printStackTrace();
        } catch (MqttException e) {
            switch (e.getReasonCode()) {
                case MqttException.REASON_CODE_BROKER_UNAVAILABLE:
                    mHandler.post(new ToastRunnable("WE ARE OFFLINE BROKER_UNAVAILABLE!", 1500));
                    break;
                case MqttException.REASON_CODE_CLIENT_TIMEOUT:
                    mHandler.post(new ToastRunnable("WE ARE OFFLINE CLIENT_TIMEOUT!", 1500));
                    break;
                case MqttException.REASON_CODE_CONNECTION_LOST:
                    mHandler.post(new ToastRunnable("WE ARE OFFLINE CONNECTION_LOST!", 1500));
                    break;
                case MqttException.REASON_CODE_SERVER_CONNECT_ERROR:
                    Timber.tag(Utils.TIMBER_TAG).v( "c " + e.getMessage());
                    e.printStackTrace();
                    break;
                case MqttException.REASON_CODE_FAILED_AUTHENTICATION:
                    Intent i = new Intent("RAISEALLARM");
                    i.putExtra("ALLARM", e);
                    Timber.tag(Utils.TIMBER_TAG).v("b " + e.getMessage());
                    break;
                default:
                    Timber.tag(Utils.TIMBER_TAG).v( "a " + e.getMessage() +" "+ e.toString());
            }
        }
        mHandler.post(new ToastRunnable("WE ARE ONLINE!", 500));

    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        Timber.tag(Utils.TIMBER_TAG).v("onStartCommand");
        String input = intent.getStringExtra(INTENT_ID);
        Timber.tag(Utils.TIMBER_TAG).v("onStartCommand "+ input);

        Intent notificationIntent = new Intent(this, MainActivity.class);
        PendingIntent pendingIntent = PendingIntent.getActivity(this,
                0, notificationIntent, 0);

        Notification notification = new NotificationCompat.Builder(this, CHANNEL_ID)
                .setContentTitle("Example Service")
                .setContentText(input)
                .setSmallIcon(R.drawable.ic_android)
                .setContentIntent(pendingIntent)
                .build();

        startForeground(1, notification);


        PowerManager powerManager = (PowerManager) getSystemService(POWER_SERVICE);
        PowerManager.WakeLock wakeLock = powerManager.newWakeLock(PowerManager.PARTIAL_WAKE_LOCK, "MyApp::MyWakelockTag");
        wakeLock.acquire();


        return START_STICKY;
    }
}

1 Ответ

0 голосов
/ 26 февраля 2020

Вы устанавливаете cleansession в значение true (options.setCleanSession(true)); из документов для setCleanSession :

Если установлено значение true, клиент и сервер не будут поддерживать состояние при перезапусках клиента, сервера или соединения. Это означает, что

  • Невозможно поддерживать доставку сообщения в указанное QOS, если клиент, сервер или соединение перезапускаются
  • Сервер будет обрабатывать подписку как недолговечную

Я думаю, что спецификации mqtt более четко указывают это:

Если для CleanSession установлено значение 1, клиент и сервер ДОЛЖНЫ отбросить любой предыдущий сеанс и начать новый. Этот сеанс длится столько же, сколько и сетевое соединение. Данные о состоянии, связанные с этим сеансом, НЕ ДОЛЖНЫ использоваться повторно в любом последующем сеансе

Поэтому, когда ваше приложение теряет соединение, сеанс отбрасывается, а новые сообщения не ставятся в очередь для доставки. Кроме того, если вы не подпишетесь при восстановлении соединения, вы не будете получать никаких дополнительных сообщений.

Однако имейте в виду, что если вы установите для cleansession значение false, то все новые сообщения, полученные, пока ваш клиент находится в автономном режиме, будут поставлены в очередь для доставки. (в зависимости от конфигурации посредника), и это может не соответствовать вашим ожиданиям, если клиент может находиться в автономном режиме в течение длительного времени.

...