Paho MQTT, пока l oop блокирует публикацию sh другому клиенту MQTT - PullRequest
0 голосов
/ 13 апреля 2020

Моя цель - правильная обработка сообщений MQTT в Backend между некоторыми модулями из IoT. Я решил реализовать класс эмулятора модуля, который будет получать мои запросы или отправлять ответ.

Вторая проблема заключается в том, что мне нужно дождаться ACK или ERR модуля после публикации sh. Для этой проблемы я создал список ack_blocker, подобный следующему:

[
    {
        "module_mac": "mac",
        "blocked": False,
        "message": {}
    },
    {
        "module_mac": "mac",
        "blocked": False,
        "message": {}
    }
]

Поэтому, когда я отправляю сообщение в указанный модуль c, для атрибута заблокированного будет установлено значение True, и я буду ждать, пока l oop после публикации сообщение. С другой стороны, мое опубликованное сообщение должно прийти к клиенту MQTT эмулятора, где он будет анализировать данные и ответы ERR или ACK. При получении возвращенного сообщения я верну заблокированному атрибуту значение «Ложь», и l oop будет завершен и вернет сообщение в бэкэнд-просмотр с ошибкой или соответствующим сообщением.

Проблема в том, что опубликованное сообщение от бэкенда никогда не будет прибыть на эмулятор MQTT клиента ИДК почему, но в моем l oop есть тайм-аут (10 с), и по истечении этого времени я должен выдать ошибку, что модуль не отвечает. Я тщательно отлаживал весь процесс, и когда бэкэнд выдает ошибку, мой клиент-эмулятор наконец получит сообщение. Я запускаю это больше раз, и это будет происходить именно так каждый раз. Поэтому я думаю, что l oop как-то блокирует отправку этого сообщения.

Это моя реализация l oop:

    def send_message(self, mac: str, message: str):
        self.publish(mac, message)
        end_time = time.time() + self.timeout

        while True:
            module_ack_blocker = next(filter(lambda obj: obj.get('module_mac') == mac, self.ack_blocker), None)

            if not module_ack_blocker.get('blocked'):
                response = module_ack_blocker.get('message')

                if response.get('result') == 'OK':
                    logging.getLogger('root_logger').info(f'[MQTT]: ACK Message received.')
                    return response
                elif response.get('result') == 'ERROR':
                    raise MQTTException(response.get('details'), status_code=mqtt_status.MQTT_ERR_NOT_SUPPORTED)

            if time.time() > end_time:
                raise MQTTException('Module is not responding.', status_code=mqtt_status.MQTT_ERR_UNKNOWN)

Итак, как вы видите, сначала я публикую sh сообщение. После этого я вычислю время ожидания и начну l oop. В l oop я сначала смотрю на правильный dict в списке блокировщиков ack (как я упоминал ранее). Я спрошу, если это не заблокировано. И после этого, если у меня еще есть время на перерыв.

Мой клиент эмулятора mqtt выглядит следующим образом:

class MqttClientEmulator(object):
    def __init__(self):
        self.app = None
        self.broker_host = None
        self.broker_port = None
        self.keep_alive = None
        self.timeout = None

        self.client = mqtt.Client(client_id='brewmaster_client_emulator')

    def init(self, app):
        self.broker_host = os.getenv('BROKER_HOST')
        self.broker_port = int(os.getenv('BROKER_PORT'))
        self.keep_alive = int(os.getenv('MQTT_KEEPALIVE'))
        self.timeout = int(os.getenv('MQTT_TIMEOUT'))
        self.app = app

        self.client.on_message = self.on_message

    def on_message(self, client, userdata, msg):
        topic = msg.topic
        string_message = str(msg.payload.decode('utf-8'))
        dict_message = json.loads(string_message)

        # Request result
        if dict_message.get('device_uuid'):
            print(dict_message)
            response = {
                "module_mac": topic,
                "sequence_number": 123,
                "result": "OK",
                "details": ""
            }
            time.sleep(1)   # Just for time reserve (this code will be more complicated in future)
            self.publish('brewmaster-backend', json.dumps(response))

    def connect(self):
        self.client.connect(self.broker_host, self.broker_port, self.keep_alive)
        self.client.loop_start()

    def disconnect(self):
        self.client.loop_stop()
        self.client.disconnect()

    def subscribe(self, name):
        self.client.subscribe(name)

    def publish(self, topic, message):
        self.client.publish(topic, message)

Я также пробовал потоки, и это не имело никакого эффекта.

1 Ответ

0 голосов
/ 14 апреля 2020

Хорошо, мне нужно было заглянуть глубже в библиотеку paho MQTT. Есть функция wait_for_publi sh для объекта MQTTMessageInfo. И если вы посмотрите на _condition-объект, то здесь уже реализована блокировка с семафорами. Поэтому все, что мне нужно, это изменить свой while l oop в методе send_message моего клиента MQTT (как показано в вопросе) на что-то вроде этого:

    def send_message(self, mac: str, message: str):
        result = self.publish(mac, message)
        end_time = time.time() + self.timeout

        if result.rc == mqtt.MQTT_ERR_QUEUE_SIZE:
            raise ValueError('Message is not queued due to ERR_QUEUE_SIZE')
        with result._condition:
            while True:
                module_ack_blocker = next(filter(lambda obj: obj.get('module_mac') == mac, self.ack_blocker), None)

                if not module_ack_blocker.get('blocked'):
                    response = module_ack_blocker.get('message')

                    if response.get('result') == 'OK':
                        logging.getLogger('root_logger').info(f'[MQTT]: ACK Message received.')
                        return response
                    elif response.get('result') == 'ERROR':
                        raise MQTTException(response.get('details'), status_code=mqtt_status.MQTT_ERR_NOT_SUPPORTED)

                if time.time() > end_time:
                    raise MQTTException('Daný modul neodpovedá.', status_code=mqtt_status.MQTT_ERR_UNKNOWN)
                result._condition.wait(1)

Где результат - объект MQTTMessageInfo и _condition.wait (1 ) ожидает с таймаутом 1 секунда. Таким образом, в основном, когда ожидается, все другие процессы работают и через 1 секунду начнется другой, в то время как l oop итерация с проверкой, если сообщение уже пришло.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...