Клиент mqtt не получает сообщения в случае потока и rest-api - PullRequest
1 голос
/ 15 февраля 2020

У меня есть скрипт python, основанный на flask и mqtt. Вариант использования - получить запрос через rest-api, а затем создать новый поток, который публикует некоторые сообщения на mosquitto mqtt и ожидает ответа (см. Подписаться). Моя проблема в том, что я не получаю никаких сообщений. Я думаю, что это как-то связано с потоком, потому что без потока он работает очень хорошо .. Вы знаете, в чем может быть проблема?

Спасибо в ожидании!

здесь код :

from flask import Flask, Response
import paho.mqtt.client as mqtt
from threading import Thread
import threading

app = Flask(__name__)
lock = threading.Lock()

def on_connect(client, userdata, flags, rc):  # The callback for when the client connects to the broker
    print("Connected with result code {0}".format(str(rc)))  # Print result of connection attempt
    client.subscribe("/mytopic")


def on_message(client, userdata, msg):  # The callback for when a PUBLISH message is received from the server.
    print(msg.topic)


client = mqtt.Client(client_id=client_name, clean_session=True)
client.on_connect = on_connect  # Define callback function for successful connection
client.on_message = on_message  # Define callback function for receipt of a message
client.username_pw_set(mqtt_user, mqtt_password)
client.loop_start()
client.connect(mqtt_host)    


def test(param1, param2):
   lock.acquire()
   try:

      ret = client.publish("/mytopic", "")
      while True:
            check the response from mqtt => but i don't get the response anymore
            ....
            break
    finally:
        lock.release()
    return result


@app.route('/test/check', methods=['POST'])
def check():
    global sessionId
    sessionId = sessionId + 1
    t = Thread(target=test, args=(sessionId,None))
    t.start()
    return {'id': sessionId, 'eta': 0}


if __name__ == '__main__':
    app.run(debug=True)

1 Ответ

0 голосов
/ 15 февраля 2020

Есть несколько проблем с этим.

  1. Для вызовов client.connect() и client.subscribe() необходимы итерации клиентской сети l oop для правильного завершения.
  2. Сеть l oop должна запускаться не реже одного раза в каждый период поддержки активности после установления соединения, чтобы посредник не мог отключить клиент как мертвый. Это означает, что если между запуском кода и первым запросом REST существует задержка, клиент будет отключен.

Лучше использовать функцию client.start_loop() для непрерывной работы клиентской сети MQTT l oop в фоновом режиме самостоятельно.

Вы также должны удалить вызов на client.subscribe(), который находится за пределами on_connect() обратного вызова.

РЕДАКТИРОВАТЬ: Как хешируется в комментариях / чате следующее работает. Похоже, что запуск приложения flask в режиме отладки делает некоторые странные вещи и создает несколько клиентов MQTT снова и снова с одним и тем же идентификатором клиента. Это приводит к тому, что брокер постоянно отбрасывает старые, чтобы сообщения никогда не доставлялись.

from flask import Flask, Response
import paho.mqtt.client as mqtt
import time
from threading import Thread
import threading

app = Flask(__name__)
lock = threading.Lock()
sessionId=0
cont=True

def on_connect(client, userdata, flags, rc): # The callback for when the client connects to the broker
print("Connected with result code {0}".format(str(rc))) # Print result of connection attempt
client.subscribe("mytopic")


def on_message(client, userdata, msg): # The callback for when a PUBLISH message is received from the server.
global cont
print(msg.topic)
cont=False


client = mqtt.Client(client_id="foo", clean_session=True)
client.on_connect = on_connect # Define callback function for successful connection
client.on_message = on_message # Define callback function for receipt of a message
#client.username_pw_set(mqtt_user, mqtt_password)
client.connect("localhost", port=1884)
client.loop_start()

def test(param1, param2):
lock.acquire()
try:
ret = client.publish("mytopic", "foo")
while cont:
time.sleep(5)
print("loop")
finally:
lock.release()

result = "foo"

return result


@app.route('/test/check', methods=['POST'])
def check():
global sessionId
sessionId = sessionId + 1
t = Thread(target=test, args=(sessionId,None))
t.start()
return {'id': sessionId, 'eta': 0}


if __name__ == '__main__':
print("started")
app.run()
...