Java mqtt одновременных подключений требует времени - PullRequest
0 голосов
/ 16 апреля 2019

Я пытаюсь сделать параллельный сеанс к серверу MQTT, и все клиенты отключатся, как только будут установлены все соединения.

В приведенном ниже коде издателя я пытаюсь создать параллельные сеансы, каждый из которых отправляет 50 сообщений. И так будет создано 500 потоков, и каждый отправит 50 сообщений. Но для создания 100 соединений требуется 10 минут. Есть ли какая-либо ошибка в кодировании, и возможно ли уменьшить скорость изменения скорости соединения в приведенном ниже коде, потому что, как и в случае с Голангом, скорость соединения там высока.

Ниже приведен код издателя:

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.MqttTopic;

public class Publisher extends Thread{

    public static final String test_topic = "test";
    private MqttClient client;
    public static final String BROKER_URL = "tcp://192.168.1.129:1883";
    CountDownLatch latch;

    public Publisher(CountDownLatch latch) {
      super();
      this.latch = latch;
     }

    public void Publisher() {
        String clientid=Thread.currentThread().getName();
        System.out.println("=========== "+clientid);
        MqttConnectOptions options = null;
        try {

             client = new MqttClient(BROKER_URL, clientid);
             options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setMaxInflight(50);
           client.connect(options);
        } catch (MqttException e) {
            try {
                client.connect(options);
            } catch (MqttSecurityException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            } catch (MqttException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
            e.printStackTrace();
            //System.exit(1);
        }
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        Publisher();
        System.out.println(Thread.currentThread().getName());
        try {
            for(int i=0;i<50;i++)
            {
            //Thread.sleep(20);
            publishTemperature();
            }


        } catch (MqttPersistenceException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (MqttException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } /*catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }*/
    }
    public void publishTemperature() throws MqttPersistenceException, MqttException {
        final MqttTopic test = client.getTopic(test_topic);
         final String temperature=""{\"test\":\"test\"}"";
         test.publish(new MqttMessage(temperature.getBytes()));

         //System.out.println("Published data. Topic: " + "test" + "  Message: " + temperature);

    }

    public MqttClient getClient() {
        return client;
    }

    public void setClient(MqttClient client) {
        this.client = client;
    }
}

Ниже приведен основной метод:

 import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttPersistenceException;

    public class test {
        static Publisher[] Publisher=null;

        public static void main(String[] args) throws MqttPersistenceException, MqttException, InterruptedException {
            final CountDownLatch latch = new CountDownLatch(50);
            Publisher = new Publisher[500];
            for(int i=0;i<500;i++)
            {
                Thread.sleep(10);
                Publisher[i]=new Publisher(latch);
                Publisher[i].start();
            }
            latch.await();
            for(int i=0;i<500;i++)
            {
                //Thread.sleep(10);
                Publisher[i].getClient().disconnectForcibly(25);


            }

        }

    }

Здесь все соединения будут соединяться и устанавливать постоянное соединение до 500 соединений. После этого все соединения будут разорваны один раз.

1 Ответ

0 голосов
/ 16 апреля 2019

Удалите следующую строку:

Publisher[i].join();

Документы для Thread.join () говорят следующее:

public final voidjoin () throws InterruptedException

Ожидание смерти этого потока.

Вызов этого метода ведет себя точно так же, как и вызов

join(0)

Это означает, что каждый раз во время цикла он будет останавливаться и ждать, пока этот поток завершит свою задачу, прежде чем создавать новый.

Если вы удалите этот вызов, он позволит всем потокам работать параллельно.

...