Сосуществование Mqtt-Subscriber и UI сбоев - PullRequest
0 голосов
/ 26 сентября 2018

Я более или менее новичок в программировании на Java.Я работаю над программой, которая получает mqtt-сообщение, которое получает доход почти каждую секунду через брокера и имеет пользовательский интерфейс.Моя проблема в том, что если mqtt-broker запустится, пользовательский интерфейс не будет работать.

На самом деле, я хочу запустить пользовательский интерфейс, который выполняет некоторые настройки.В пользовательском интерфейсе есть одна страница, которая должна выполнять некоторые задачи анализа, например, фиксировать текущее состояние связи.Теперь я открыл брокер mqtt как счетчик в пользовательском интерфейсе -> этап пользовательского интерфейса исчезает, и запускается только брокер.

Цель должна состоять в том, чтобы первый пользовательский интерфейс выполнялся… На следующем шаге захватывается только одно сообщениеи добавленный к счетчику пользовательский интерфейс ожидает и остается открытым (видимым) во время этого процесса.После этого процесса у брокера возникает перерыв, и пользовательский интерфейс может запускаться снова и, возможно, повторяет весь процесс снова и снова.

Извините за это запутанное описание, но, возможно, у кого-то есть решение моей проблемы. Спасибо!

Ответы [ 3 ]

0 голосов
/ 27 сентября 2018

Краткое обновление ...

Более или менее решение Bara 'Hashesh работает нормально, теперь интерфейс и mqtt-broker сосуществуют.

Теперь проблема в том, что новый открытый поток является «бесконечным» потоком, потому что сообщение mqtt приходит снова и снова, пока вы не остановите его вручную.Есть ли способ прервать новый запущенный процесс ... Я читал о методе mythread.interupt, но не могу найти способ объединить его с текущим методом.

0 голосов
/ 27 сентября 2018

По сути, это общедоступный сценарий IBM, который я использую для связи.Я бросил импорт и некоторые комментарии, которые на самом деле не важны для проблемы.Чтобы ответить на ваш вопрос, Мартин, на самом деле пока нет содержимого ...

открытый класс MqttCommunicationClassv1 реализует MqttCallback {

/**
 * The main entry point of the sample.
 *
 * This method handles parsing of the arguments specified on the command-line
 * before performing the specified action.
 * mqtt_action = publish || subscribe
 */
public static void mqtt_start(String mqtt_action)
{
    // Default settings:
    boolean quietMode = false;
    String action =  mqtt_action;
    String topic = "";
    String message = "Message from client !!!";
    int qos = 2;
    String broker = "192.168.100.1";
    int port = 1883;
    String clientId = null;
    String subTopic = "RPI-Measurement";
    String pubTopic = "RPI-Measurement";
    boolean cleanSession = true; // Non durable subscriptions
    boolean ssl = false;
    String password = null;
    String userName = null;
    // Parse the arguments -

    // Validate the provided arguments
    if (!action.equals("publish") && !action.equals("subscribe")) {
        System.out.println("Invalid action: " + action);
        printHelp();
        return;
    }
    if (qos < 0 || qos > 2) {
        System.out.println("Invalid QoS: " + qos);
        printHelp();
        return;
    }
    if (topic.equals("")) {
        // Set the default topic according to the specified action
        if (action.equals("publish")) {
            topic = pubTopic;
        } else {
            topic = subTopic;
        }
    }

    String protocol = "tcp://";

    if (ssl) {
        protocol = "ssl://";
    }

    String url = protocol + broker + ":" + port;

    if (clientId == null || clientId.equals("")) {
        clientId = "SampleJavaV3_" + action;
    }

    // With a valid set of arguments, the real work of
    // driving the client API can begin
    try {
        // Create an instance of this class
        MqttCommunicationClassv1 sampleClient = new MqttCommunicationClassv1(url, clientId, cleanSession, quietMode,
                userName, password);

        // Perform the requested action
        if (action.equals("publish")) {
            sampleClient.publish(topic, qos, message.getBytes());
        } else if (action.equals("subscribe")) {
            sampleClient.subscribe(topic, qos);
        }
    } catch (MqttException me) {
        // Display full details of any exception that occurs
        System.out.println("reason " + me.getReasonCode());
        System.out.println("msg " + me.getMessage());
        System.out.println("loc " + me.getLocalizedMessage());
        System.out.println("cause " + me.getCause());
        System.out.println("excep " + me);
        me.printStackTrace();
    }
// Private instance variables
private MqttClient client;
private String brokerUrl;
private boolean quietMode;
private MqttConnectOptions conOpt;
private boolean clean;
private String password;
private String userName;

/**
 * Constructs an instance of the sample client wrapper
 * 
 * @param brokerUrl    the url of the server to connect to
 * @param clientId     the client id to connect with
 * @param cleanSession clear state at end of connection or not (durable or
 *                     non-durable subscriptions)
 * @param quietMode    whether debug should be printed to standard out
 * @param userName     the username to connect with
 * @param password     the password for the user
 * @throws MqttException
 */
public MqttCommunicationClassv1(String brokerUrl, String clientId, boolean cleanSession, boolean quietMode,
        String userName, String password) throws MqttException {
    this.brokerUrl = brokerUrl;
    this.quietMode = quietMode;
    this.clean = cleanSession;
    this.password = password;
    this.userName = userName;
    // This sample stores in a temporary directory... where messages temporarily
    // stored until the message has been delivered to the server.
    // ..a real application ought to store them somewhere
    // where they are not likely to get deleted or tampered with
    String tmpDir = System.getProperty("java.io.tmpdir");
    MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);

    try {
        // Construct the connection options object that contains connection parameters
        // such as cleanSession and LWT
        conOpt = new MqttConnectOptions();
        conOpt.setCleanSession(clean);
        if (password != null) {
            conOpt.setPassword(this.password.toCharArray());
        }
        if (userName != null) {
            conOpt.setUserName(this.userName);
        }

        // Construct an MQTT blocking mode client
        client = new MqttClient(this.brokerUrl, clientId, dataStore);

        // Set this wrapper as the callback handler
        client.setCallback(this);

    } catch (MqttException e) {
        e.printStackTrace();
        log("Unable to set up client: " + e.toString());
        System.exit(1);
    }
}

/**
 * Publish / send a message to an MQTT server
 * 
 * @param topicName the name of the topic to publish to
 * @param qos       the quality of service to delivery the message at (0,1,2)
 * @param payload   the set of bytes to send to the MQTT server
 * @throws MqttException
 */
public void publish(String topicName, int qos, byte[] payload) throws MqttException {

    // Connect to the MQTT server
    log("Connecting to " + brokerUrl + " with client ID " + client.getClientId());
    client.connect(conOpt);
    log("Connected");

    String time = new Timestamp(System.currentTimeMillis()).toString();
    log("Publishing at: " + time + " to topic \"" + topicName + "\" qos " + qos);

    // Create and configure a message
    MqttMessage message = new MqttMessage(payload);
    message.setQos(qos);

    // Send the message to the server, control is not returned until
    // it has been delivered to the server meeting the specified
    // quality of service.
    client.publish(topicName, message);

    // Disconnect the client
    client.disconnect();
    log("Disconnected");
}

/**
 * Subscribe to a topic on an MQTT server Once subscribed this method waits for
 * the messages to arrive from the server that match the subscription. It
 * continues listening for messages until the enter key is pressed.
 * 
 * @param topicName to subscribe to (can be wild carded)
 * @param qos       the maximum quality of service to receive messages at for
 *                  this subscription
 * @throws MqttException
 */
public void subscribe(String topicName, int qos) throws MqttException {

    // Connect to the MQTT server
    client.connect(conOpt);
    log("Connected to " + brokerUrl + " with client ID " + client.getClientId());

    // Subscribe to the requested topic
    // The QoS specified is the maximum level that messages will be sent to the
    // client at.
    // For instance if QoS 1 is specified, any messages originally published at QoS
    // 2 will
    // be downgraded to 1 when delivering to the client but messages published at 1
    // and 0
    // will be received at the same level they were published at.
    log("Subscribing to topic \"" + topicName + "\" qos " + qos);
    client.subscribe(topicName, qos);

    // Continue waiting for messages until the Enter is pressed
    log("Press <Enter> to exit");
    try {
        System.in.read();
    } catch (IOException e) {
        // If we can't read we'll just exit
    }

    // Disconnect the client from the server
    client.disconnect();
    log("Disconnected");
}

/**
 * Utility method to handle logging. If 'quietMode' is set, this method does
 * nothing
 * 
 * @param message the message to log
 */
private void log(String message) {
    if (!quietMode) {
        System.out.println(message);
    }
}

/****************************************************************/
/* Methods to implement the MqttCallback interface */
/****************************************************************/

/**
 * @see MqttCallback#connectionLost(Throwable)
 */
public void connectionLost(Throwable cause) {
    // Called when the connection to the server has been lost.
    // An application may choose to implement reconnection
    // logic at this point. This sample simply exits.
    log("Connection to " + brokerUrl + " lost!" + cause);
    System.exit(1);
}

/**
 * @see MqttCallback#deliveryComplete(IMqttDeliveryToken)
 */
public void deliveryComplete(IMqttDeliveryToken token) {
    // Called when a message has been delivered to the
    // server. The token passed in here is the same one
    // that was passed to or returned from the original call to publish.
    // This allows applications to perform asynchronous
    // delivery without blocking until delivery completes.
    //
    // This sample demonstrates asynchronous deliver and
    // uses the token.waitForCompletion() call in the main thread which
    // blocks until the delivery has completed.
    // Additionally the deliveryComplete method will be called if
    // the callback is set on the client
    //
    // If the connection to the server breaks before delivery has completed
    // delivery of a message will complete after the client has re-connected.
    // The getPendingTokens method will provide tokens for any messages
    // that are still to be delivered.
}

/**
 * @return 
 * @see MqttCallback#messageArrived(String, MqttMessage)
 */
public void messageArrived(String topic, MqttMessage message) throws MqttException {

    // Called when a message arrives from the server that matches any
    // subscription made by the client
    String time = new Timestamp(System.currentTimeMillis()).toString();
    System.out.println("Time:\t" + time + "  Message:\t" + new String(message.getPayload()));
    mqttMssg = new String(message.getPayload());
    System.out.println("Instance 1");


}

/****************************************************************/
/* End of MqttCallback methods */
/****************************************************************/

static void printHelp() {
    System.out.println("Syntax:\n\n" + "    Sample [-h] [-a publish|subscribe] [-t <topic>] [-m <message text>]\n"
            + "            [-s 0|1|2] -b <hostname|IP address>] [-p <brokerport>] [-i <clientID>]\n\n"
            + "    -h  Print this help text and quit\n" + "    -q  Quiet mode (default is false)\n"
            + "    -a  Perform the relevant action (default is publish)\n"
            + "    -t  Publish/subscribe to <topic> instead of the default\n"
            + "            (publish: \"Sample/Java/v3\", subscribe: \"Sample/#\")\n"
            + "    -m  Use <message text> instead of the default\n"
            + "            (\"Message from MQTTv3 Java client\")\n"
            + "    -s  Use this QoS instead of the default (2)\n"
            + "    -b  Use this name/IP address instead of the default (m2m.eclipse.org)\n"
            + "    -p  Use this port instead of the default (1883)\n\n"
            + "    -i  Use this client ID instead of SampleJavaV3_<action>\n"
            + "    -c  Connect to the server with a clean session (default is false)\n"
            + "     \n\n Security Options \n" + "     -u Username \n" + "     -z Password \n"
            + "     \n\n SSL Options \n" + "    -v  SSL enabled; true - (default is false) "
            + "    -k  Use this JKS format key store to verify the client\n"
            + "    -w  Passpharse to verify certificates in the keys store\n"
            + "    -r  Use this JKS format keystore to verify the server\n"
            + " If javax.net.ssl properties have been set only the -v flag needs to be set\n"
            + "Delimit strings containing spaces with \"\"\n\n"
            + "Publishers transmit a single message then disconnect from the server.\n"
            + "Subscribers remain connected to the server and receive appropriate\n"
            + "messages until <enter> is pressed.\n\n");
}

}

0 голосов
/ 26 сентября 2018

Вот простой пример

public void someLongTask(){
    new Thread(()->{
        //code for task here
        Platform.runLater(()->{
            //code to update UI here
        });
    }).start()
}

в качестве заметки, я использовал лямбда-выражения, чтобы уменьшить количество кода

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...