Моя цель - правильная обработка сообщений MQTT в Backend между некоторыми модулями из IoT. Я решил реализовать класс эмулятора модуля, который будет получать мои запросы или отправлять ответ.
Вторая проблема заключается в том, что мне нужно дождаться ACK или ERR модуля после публикации sh. Для этой проблемы я создал список ack_blocker, подобный следующему:
[
{
"module_mac": "mac",
"blocked": False,
"message": {}
},
{
"module_mac": "mac",
"blocked": False,
"message": {}
}
]
Поэтому, когда я отправляю сообщение в указанный модуль c, для атрибута заблокированного будет установлено значение True, и я буду ждать, пока l oop после публикации сообщение. С другой стороны, мое опубликованное сообщение должно прийти к клиенту MQTT эмулятора, где он будет анализировать данные и ответы ERR или ACK. При получении возвращенного сообщения я верну заблокированному атрибуту значение «Ложь», и l oop будет завершен и вернет сообщение в бэкэнд-просмотр с ошибкой или соответствующим сообщением.
Проблема в том, что опубликованное сообщение от бэкенда никогда не будет прибыть на эмулятор MQTT клиента ИДК почему, но в моем l oop есть тайм-аут (10 с), и по истечении этого времени я должен выдать ошибку, что модуль не отвечает. Я тщательно отлаживал весь процесс, и когда бэкэнд выдает ошибку, мой клиент-эмулятор наконец получит сообщение. Я запускаю это больше раз, и это будет происходить именно так каждый раз. Поэтому я думаю, что l oop как-то блокирует отправку этого сообщения.
Это моя реализация l oop:
def send_message(self, mac: str, message: str):
self.publish(mac, message)
end_time = time.time() + self.timeout
while True:
module_ack_blocker = next(filter(lambda obj: obj.get('module_mac') == mac, self.ack_blocker), None)
if not module_ack_blocker.get('blocked'):
response = module_ack_blocker.get('message')
if response.get('result') == 'OK':
logging.getLogger('root_logger').info(f'[MQTT]: ACK Message received.')
return response
elif response.get('result') == 'ERROR':
raise MQTTException(response.get('details'), status_code=mqtt_status.MQTT_ERR_NOT_SUPPORTED)
if time.time() > end_time:
raise MQTTException('Module is not responding.', status_code=mqtt_status.MQTT_ERR_UNKNOWN)
Итак, как вы видите, сначала я публикую sh сообщение. После этого я вычислю время ожидания и начну l oop. В l oop я сначала смотрю на правильный dict в списке блокировщиков ack (как я упоминал ранее). Я спрошу, если это не заблокировано. И после этого, если у меня еще есть время на перерыв.
Мой клиент эмулятора mqtt выглядит следующим образом:
class MqttClientEmulator(object):
def __init__(self):
self.app = None
self.broker_host = None
self.broker_port = None
self.keep_alive = None
self.timeout = None
self.client = mqtt.Client(client_id='brewmaster_client_emulator')
def init(self, app):
self.broker_host = os.getenv('BROKER_HOST')
self.broker_port = int(os.getenv('BROKER_PORT'))
self.keep_alive = int(os.getenv('MQTT_KEEPALIVE'))
self.timeout = int(os.getenv('MQTT_TIMEOUT'))
self.app = app
self.client.on_message = self.on_message
def on_message(self, client, userdata, msg):
topic = msg.topic
string_message = str(msg.payload.decode('utf-8'))
dict_message = json.loads(string_message)
# Request result
if dict_message.get('device_uuid'):
print(dict_message)
response = {
"module_mac": topic,
"sequence_number": 123,
"result": "OK",
"details": ""
}
time.sleep(1) # Just for time reserve (this code will be more complicated in future)
self.publish('brewmaster-backend', json.dumps(response))
def connect(self):
self.client.connect(self.broker_host, self.broker_port, self.keep_alive)
self.client.loop_start()
def disconnect(self):
self.client.loop_stop()
self.client.disconnect()
def subscribe(self, name):
self.client.subscribe(name)
def publish(self, topic, message):
self.client.publish(topic, message)
Я также пробовал потоки, и это не имело никакого эффекта.