Потеря сообщения paho.mqtt.c на сервере запросов / ответов - PullRequest
0 голосов
/ 03 июля 2019

Я тестирую простую модель клиент-сервер MQTT, где клиент многократно отправляет запросы на сервер и ожидает ответов на каждый запрос. Поток такой:

  • Клиент подписывается на тему «сервер / клиент» с QoS 1
  • Сервер подписывается на тему «клиент / сервер» с Qos 1
  • Клиент отправляет на сервер 1000 сообщений, при доставке сообщений клиент увеличивает свой счетчик сообщений на 1.
  • Сервер получает новое сообщение и вызывает зарегистрированный обратный вызов. При обратном вызове сервер отправляет простое сообщение «сервер / клиент». При доставке сообщений сервер увеличивает свой счетчик messagesOut.
  • В новых сообщениях в теме «сервер / клиент» клиент увеличивает свои сообщения в счетчике на 1.
  • Как сервер, так и клиент остановятся, когда messagesIn клиента и сообщения сервера OUT равны 1000.

С уровнем QoS 1 я ожидаю, что сервер полностью получит 1000 сообщений, отправленных клиентом, и отправит 1000 ответов обратно. Однако моя программа всегда зависает, когда сервер получает и отправляет около 200-300 сообщений. Когда я закомментирую звонок на MQTTAsync_sendMessage(), сервер может получить все 1000 сообщений. Я подозреваю, что это вызвано каким-то состоянием гонки или ограничением потоков, но я не слишком уверен.

Обратный вызов для новых сообщений на сервере выглядит следующим образом:

#define TOPIC_IN   "client/server"
#define TOPIC_OUT  "server/client"
#define PAYLOAD    "hello client\n"

unsigned long long messagesIn = 0, messagesOut = 0;

int on_message_arrival (void *context, char *topic_name, 
                        int topic_length, MQTTAsync_message *message) {
    if (!strcmp(topic_name, TOPIC_IN)) {
        MQTTAsync_freeMessage(&message);
        MQTTAsync_free(topic_name);

        messagesIn++;

        MQTTAsync_message pub_message = MQTTAsync_message_initializer;
        MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;

        pub_message.payload = PAYLOAD;
        pub_message.payloadlen = strlen(PAYLOAD);
        pub_message.qos = QOS1;
        pub_message.retained = 0;
        delivered_token = 0;

        opts.context = &message;

        MQTTAsync_sendMessage((MQTTAsync)context, TOPIC_OUT, &pub_message, &opts);
    } else {
        MQTTAsync_freeMessage(&message);
        MQTTAsync_free(topic_name);
    }
    return 1;
}

Любое руководство, чтобы помочь мне отладить это приветствуется. Большое спасибо!

...