Причиной ошибки многопоточности в приложении MQTT (Python)? - PullRequest
0 голосов
/ 18 января 2019

В моем коде я создаю потоки, которые publish.single несколько раз в MQTT-соединении. Однако эта ошибка возникает, и я не могу понять или найти ее происхождение. Единственный раз, когда он упоминает мой код с line 75, in send_on_sensor.

Exception in thread Thread-639:
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 917, in _bootstrap_inner
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 865, in run
    self._target(*self._args, **self._kwargs)
  File "/Users//PycharmProjects//V3_multiTops/mt_GenPub.py", line 75, in send_on_sensor
    publish.single(topic, payload, hostname=hostname)
  File "/Users//PycharmProjects//venv/lib/python3.7/site-packages/paho/mqtt/publish.py", line 223, in single
    protocol, transport)
  File "/Users//PycharmProjects//venv/lib/python3.7/site-packages/paho/mqtt/publish.py", line 159, in multiple
    client.connect(hostname, port, keepalive)
  File "/Users//PycharmProjects//venv/lib/python3.7/site-packages/paho/mqtt/client.py", line 839, in connect
    return self.reconnect()
  File "/Users//PycharmProjects//venv/lib/python3.7/site-packages/paho/mqtt/client.py", line 962, in reconnect
    sock = socket.create_connection((self._host, self._port), source_address=(self._bind_address, 0))
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/socket.py", line 727, in create_connection
    raise err
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/socket.py", line 716, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 61] Connection refused

Это упомянутая часть кода. Брошенная строка 75 - это строка с time.sleep(delay). Этот метод будет вызываться в новом потоке всякий раз, когда должен быть отправлен новый набор данных (в виде очереди точек).

def send_on_sensor(q, topic, delay):
    while q.empty is not True:
        payload = json.dumps(q.get())
        publish.single(topic, payload, hostname=hostname)
        time.sleep(delay)

У меня такое ощущение, что я делаю что-то, что не "потокобезопасно" ?! Также эта проблема возникает особенно, когда delay - короткий интервал (<1 с). Из моего вывода я вижу, что следующий набор данных (100 баллов) начнет отправку в новом потоке до того, как первый завершит отправку. Я могу исправить это, а также эту ошибку, увеличив временной интервал между двумя наборами данных. Например. если я определю задержку между наборами, используя это соотношение <code>set_delay = 400 * point_delay, я могу смело использовать delay 0,1 с. Однако то же самое отношение не будет работать для меньших задержек, поэтому это решение действительно не удовлетворяет меня.

Что я могу сделать с этой проблемой? Я действительно хочу, чтобы мой delay был ниже 0,1 с и смог отрегулировать его.

EDIT

это метод, который создает потоки:

def send_dataset(data, labels, secs=0):    
    qs = []
    for i in range(8):
        qs.append(queue.Queue())

    for value in data:
        msg = {
            "key":       value,
        }

        # c is set accordingly

        qs[c].put(msg)

    for q in qs:
        topic = sensors[qs.index(q)]
        t = threading.Thread(target=send_on_sensor, args=(q, topic, secs))
        t.start()
        time.sleep(secs)

и здесь я запускаю все методы с

output_interval = 0.01

while True:

    X, y = give_dataset()

    send_dataset(X, y, output_interval)
    time.sleep(output_interval * 2000)

1 Ответ

0 голосов
/ 18 января 2019

Несмотря на то, что вы добавили дополнительный код, он не показывает много. Тем не менее, у меня есть опыт, когда подобное происходит со мной. Я делал тяжелое многопоточное приложение с MQTT, и его вполне удалось сохранить. Не совсем, но это так.

Причиной возникновения ошибки при уменьшении задержки является то, что у вас ОДИН клиент. Публикуя сообщение (я не уверен, потому что я не вижу ваш код), вы подключаетесь, отправляете сообщение и отключаете !. Поскольку вы используете этот процесс, вы, скорее всего, отправляете одно сообщение (все еще в процессе) и собираетесь опубликовать новое в новой теме. Однако первый поток завершается и отключает клиента. Новый поток пытается опубликовать, но вы не можете, потому что предыдущий поток отключил вас.

Решение:

1) Не отключать клиента при публикации

2) Рискованно, и вам нужно больше кода: Для каждой публикации создайте нового клиента, но убедитесь, что обрабатываете это правильно. Это означает: создавать клиента, публиковать и отключать снова и снова, но убедитесь, что вы правильно закрыли соединения и удалили клиентов, чтобы не хранить мертвые клиенты

3) решение 2) - попробуйте сделать функцию, которая будет делать все - создать клиента, подключиться и опубликовать и умирает после его окончания. Если вы используете такую ​​функцию, я думаю, вам не придется заботиться о проблемах, возникающих в решении 2

Обновление:

Если ваша проблема в чем-то другом, я все же думаю, что это не из-за самих потоков, а из-за того, что несколько потоков пытаются управлять чем-то, что должно управляться только одним потоком - например, клиентским объектом

Обновление: код шаблона

Имейте в виду, что это мой старый код, и я больше его не использую, потому что моим приложениям нужно определенное отношение к потоку и т. Д., Поэтому я переписываю его для каждого приложения отдельно. Но этот работает как прелесть для непоточных приложений и, возможно, для многопоточных. Он может публиковать только с qos = 0

import paho.mqtt.client as mqtt
import json
# Define Variables
MQTT_BROKER = ""
MQTT_PORT = 1883
MQTT_KEEPALIVE_INTERVAL = 5
MQTT_TOPIC = ""




class pub:
    def __init__(self,MQTT_BROKER,MQTT_PORT,MQTT_KEEPALIVE_INTERVAL,MQTT_TOPIC,transport = ''):
        self.MQTT_TOPIC = MQTT_TOPIC
        self.MQTT_BROKER =MQTT_BROKER
        self.MQTT_PORT = MQTT_PORT
        self.MQTT_KEEPALIVE_INTERVAL = MQTT_KEEPALIVE_INTERVAL
        # Initiate MQTT Client
        if transport == 'websockets':
            self.mqttc = mqtt.Client(transport='websockets')

        else:
            self.mqttc = mqtt.Client()
        # Register Event Handlers
        self.mqttc.on_publish = self.on_publish
        self.mqttc.on_connect = self.on_connect
        self.connect()

    # Define on_connect event Handler
    def on_connect(self,mosq, obj, rc):
            print("mqtt.thingstud.io")
    # Define on_publish event Handler
    def on_publish(self,client, userdata, mid):
            print("Message Published...")
    def publish(self,MQTT_MSG):
        MQTT_MSG = json.dumps(MQTT_MSG)

        # Publish message to MQTT Topic 
        self.mqttc.publish(self.MQTT_TOPIC,MQTT_MSG)



        # Disconnect from MQTT_Broker
    def connect(self):
        self.mqttc.connect(self.MQTT_BROKER, self.MQTT_PORT, self.MQTT_KEEPALIVE_INTERVAL) 


    def disconnect(self):
        self.mqttc.disconnect()
p = pub(MQTT_BROKER,MQTT_PORT,MQTT_KEEPALIVE_INTERVAL,MQTT_TOPIC)
p.publish('some messages')
p.publish('more messages')

Обратите внимание, что при создании объекта я подключаюсь автоматически, но я не отключаюсь. Это то, что вы должны сделать вручную

Я предлагаю вам попытаться создать столько объектов pub , сколько у вас есть датчиков, и публиковать их вместе с ними.

...