Я тестирую простую модель клиент-сервер MQTT, где клиент многократно отправляет запросы на сервер и ожидает ответов на каждый запрос. Поток такой:
- Клиент подписывается на тему «сервер / клиент» с QoS 1
- Сервер подписывается на тему «клиент / сервер» с Qos 1
- Клиент отправляет на сервер 1000 сообщений, при доставке сообщений клиент увеличивает свой счетчик сообщений на 1.
- Сервер получает новое сообщение и вызывает зарегистрированный обратный вызов. При обратном вызове сервер отправляет простое сообщение «сервер / клиент». При доставке сообщений сервер увеличивает свой счетчик messagesOut.
- В новых сообщениях в теме «сервер / клиент» клиент увеличивает свои сообщения в счетчике на 1.
- Как сервер, так и клиент остановятся, когда messagesIn клиента и сообщения сервера OUT равны 1000.
С уровнем QoS 1 я ожидаю, что сервер полностью получит 1000 сообщений, отправленных клиентом, и отправит 1000 ответов обратно. Однако моя программа всегда зависает, когда сервер получает и отправляет около 200-300 сообщений. Когда я закомментирую звонок на MQTTAsync_sendMessage()
, сервер может получить все 1000 сообщений. Я подозреваю, что это вызвано каким-то состоянием гонки или ограничением потоков, но я не слишком уверен.
Обратный вызов для новых сообщений на сервере выглядит следующим образом:
#define TOPIC_IN "client/server"
#define TOPIC_OUT "server/client"
#define PAYLOAD "hello client\n"
unsigned long long messagesIn = 0, messagesOut = 0;
int on_message_arrival (void *context, char *topic_name,
int topic_length, MQTTAsync_message *message) {
if (!strcmp(topic_name, TOPIC_IN)) {
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topic_name);
messagesIn++;
MQTTAsync_message pub_message = MQTTAsync_message_initializer;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
pub_message.payload = PAYLOAD;
pub_message.payloadlen = strlen(PAYLOAD);
pub_message.qos = QOS1;
pub_message.retained = 0;
delivered_token = 0;
opts.context = &message;
MQTTAsync_sendMessage((MQTTAsync)context, TOPIC_OUT, &pub_message, &opts);
} else {
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topic_name);
}
return 1;
}
Любое руководство, чтобы помочь мне отладить это приветствуется. Большое спасибо!