Я пытаюсь сделать параллельный сеанс к серверу MQTT, и все клиенты отключатся, как только будут установлены все соединения.
В приведенном ниже коде издателя я пытаюсь создать параллельные сеансы, каждый из которых отправляет 50 сообщений. И так будет создано 500 потоков, и каждый отправит 50 сообщений. Но для создания 100 соединений требуется 10 минут. Есть ли какая-либо ошибка в кодировании, и возможно ли уменьшить скорость изменения скорости соединения в приведенном ниже коде, потому что, как и в случае с Голангом, скорость соединения там высока.
Ниже приведен код издателя:
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
public class Publisher extends Thread{
public static final String test_topic = "test";
private MqttClient client;
public static final String BROKER_URL = "tcp://192.168.1.129:1883";
CountDownLatch latch;
public Publisher(CountDownLatch latch) {
super();
this.latch = latch;
}
public void Publisher() {
String clientid=Thread.currentThread().getName();
System.out.println("=========== "+clientid);
MqttConnectOptions options = null;
try {
client = new MqttClient(BROKER_URL, clientid);
options = new MqttConnectOptions();
options.setCleanSession(true);
options.setMaxInflight(50);
client.connect(options);
} catch (MqttException e) {
try {
client.connect(options);
} catch (MqttSecurityException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (MqttException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
e.printStackTrace();
//System.exit(1);
}
}
@Override
public void run() {
// TODO Auto-generated method stub
Publisher();
System.out.println(Thread.currentThread().getName());
try {
for(int i=0;i<50;i++)
{
//Thread.sleep(20);
publishTemperature();
}
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} /*catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}*/
}
public void publishTemperature() throws MqttPersistenceException, MqttException {
final MqttTopic test = client.getTopic(test_topic);
final String temperature=""{\"test\":\"test\"}"";
test.publish(new MqttMessage(temperature.getBytes()));
//System.out.println("Published data. Topic: " + "test" + " Message: " + temperature);
}
public MqttClient getClient() {
return client;
}
public void setClient(MqttClient client) {
this.client = client;
}
}
Ниже приведен основной метод:
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
public class test {
static Publisher[] Publisher=null;
public static void main(String[] args) throws MqttPersistenceException, MqttException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(50);
Publisher = new Publisher[500];
for(int i=0;i<500;i++)
{
Thread.sleep(10);
Publisher[i]=new Publisher(latch);
Publisher[i].start();
}
latch.await();
for(int i=0;i<500;i++)
{
//Thread.sleep(10);
Publisher[i].getClient().disconnectForcibly(25);
}
}
}
Здесь все соединения будут соединяться и устанавливать постоянное соединение до 500 соединений. После этого все соединения будут разорваны один раз.