Paho MQTT "не удалось получить на сокете: [Errno 32] Broken pipe" - PullRequest
0 голосов
/ 11 апреля 2020

У меня есть клиент Paho MQTT, настроенный для приема сообщений, который выглядит следующим образом:

class PrimaryListener:

    def __init__(self):
        self.client = mqtt.Client("paho-test-subscriber", False)
        self.client.on_connect= self.on_connect 
        self.client.on_message= self.on_message   
        self.client.on_disconnect = self.on_disconnect     

        self.client.connect(msg_defn.broker_ip, msg_defn.broker_port)
        self.client.subscribe("test-topic")

    def on_connect(self, client, userdata, flags, rc):
        if rc==0:
            print("connected OK Returned code=",rc,flush=True)
        else:
            print("Bad connection Returned code=",rc, flush=True)

    def on_message(self, client, userdata, message):
        msg_str = message.payload.decode("utf-8")
        print("message received : " , str(msg_str))
        print("message topic : ", message.topic)

    def on_disconnect(self, client, userdata,rc=0):
        self.client.loop_stop()


    def subscribe(self): 
        self.client.loop_forever()

if __name__ == "__main__":
    primaryListener = PrimaryListener()
    primaryListener.subscribe()

каждый из издателей выглядит так, пытаясь отправлять сообщения каждые десять секунд ... .

class Publisher:

    def __init__(self):
        self.client = mqtt.Client("paho-test-publisher", False)
        self.client.on_log = self.on_log  
        self.client.on_connect = self.on_connect 
        self.client.connect(msg_defn.broker_ip, msg_defn.broker_port)
        self.publishing_increment = 7 

    def on_log(self,client, userdata, level, buf):
        print("log: ", buf)

    def on_connect(self,client, userdata, flags, rc):
        if rc==0:
            print("connected OK Returned code=",rc, flush=True)
        else:
            print("Bad connection Returned code=",rc, flush=True)

    def send_message(self, log_str):
        file_dict = json.loads(log_str)
        for item in file_dict: 
            self.client.publish("test-topic", json.dumps(item))   
            time.sleep(self.publishing_increment)

    def publishFromFile(self,file_name):
        with open(file_name, "r") as jsonfile:
            file_str = jsonfile.read()
        file_dict = json.loads(file_str)
        numStr = str(randint(0, 10))
        while True:
            for item in file_dict: 
                self.client.publish("test-topic", numStr)  #json.dumps(item))   
                time.sleep(self.publishing_increment)
            time.sleep(10)

if __name__ == "__main__":
    publisher = Publisher()
    publisher.publishFromFile("disconnected_test.txt")

Я пытаюсь проверить это. Когда я запускаю файл PrimaryListener и запускаю один Publisher в другом окне терминала, он работает нормально. Когда я пытаюсь запустить второй Publisher, первый Publisher регистрирует строку вывода ....

log:  failed to receive on socket: [Errno 32] Broken pipe

, а затем прекращает отправку сообщений. Что я тут не так делаю?

Ответы [ 2 ]

1 голос
/ 11 апреля 2020

MQTT требует, чтобы у каждого клиента был уникальный идентификатор клиента.

Точное поведение, когда второй клиент подключается с тем же идентификатором, остается за брокером. То, что вы видите, является обычным: брокер запускает первое и принимает соединение от второго.

Вы пытаетесь соединить обоих издателей с одинаковым идентификатором: "paho-test- издатель ".

Если вам не нужно указывать имя идентификатора c, вы можете подключиться с пустым идентификатором" ", и брокер назначит случайное имя для вашего клиента.

0 голосов
/ 13 апреля 2020

Это именно та проблема, которая у меня была, а затем исправление сделало все различия. Не используйте 'client.connect ()'

...