Как я могу в режиме реального времени с Python и MQTT? Данные отображаются только после завершения процесса - PullRequest
0 голосов
/ 07 мая 2019

У меня есть код Python для потоковой передачи данных с помощью MQTT Paho.Я хотел бы видеть данные в режиме реального времени на консоли.К сожалению, я получаю результаты только в конце процесса потоковой передачи.Я сейчас не знаю, что я делаю не так.Я использую функцию loop ()

Процесс кода выглядит следующим образом:

Я создаю соединение и указываю порт mqq, имя пользователя, последовательный порт и т. Д. Затем я читаю координатыданные для фрезерования (у меня фрезерный станок).Я помещаю результаты процесса в файл json.

Может кто-то увидеть, что я делаю неправильно?

мой класс машины

class Model_Machine(machine_Connector):
def __init__(self):
    machine_Connector.__init__(self)
    self.cmd_file_init = self.base_path + "/files/machine_init.txt"
    self.cmd_file = self.base_path + "/files/machine.txt"

def mqtt_on_message(self, client, userdata, message):
    if message.topic == "model_control":
        print(message.payload.decode("utf-8"))
        if message.payload.decode("utf-8") == "machine_start":
            self.start_machine()
        elif message.payload.decode("utf-8") == "machine_start_%s" % self.modelid:
            self.start_machine()
        elif message.payload.decode("utf-8") == "machine_init":
            self.reset()
        elif message.payload.decode("utf-8") == "machine_init_%s" % self.modelid:
            self.reset()
        elif message.payload.decode("utf-8") == "global_init":
            self.reset()
        else:
            print("Received (without doing): ", message.payload.decode("utf-8"))

def reset(self):
    self.execute_command_file(self.cmd_file_init)    
def start_machine(self):
    print("Machine START")
    self.serial_send(bytes("\r", "ascii"))
    time.sleep(1)
    # Clear serial buffer with OKs
    self.serial.reset_input_buffer()
    self.execute_command_file(self.cmd_file)

def run(self):
    # Set the init config path now that the modelid is set by main
    self.cmd_file_init = self.base_path + "/files/machine_init_%s.txt" % self.modelid
    print("RUN")
    self.mqtt_connect()
    self.serial_connect()
    time.sleep(2)
    self.serial_send(bytes("\r", "ascii"))
    time.sleep(2)
    self.serial_send(bytes("\r", "ascii"))
    time.sleep(2)
    Event().wait()

Класс подключения немного больше, поэтому я положил некоторые фрагменты класса

def mqtt_connect(self):
    mqtt_client_id = "Client_%s_%s_%d" % (self.modeltype.lower(), self.modelid, time.time()) 
    self.mqtt_client = mqtt.Client(mqtt_client_id)
    self.mqtt_client.on_connect = self.mqtt_on_connect
    self.mqtt_client.on_message = self.mqtt_on_message

    if self.mqtt_username is not None and self.mqtt_password is not None:
        self.mqtt_client.username_pw_set(
            username=self.mqtt_username,
            password=self.mqtt_password
        )

    while self.mqtt_connected != True:
        try:
            print("Attempting MQTT connection with ", self.mqtt_address, self.mqtt_port, self.mqtt_keepalive)
            self.mqtt_client.connect(self.mqtt_address, self.mqtt_port, self.mqtt_keepalive)
            self.mqtt_connected = True
        except:
            print("Could not connect. Check network.")
            time.sleep(5)
    self.mqtt_client.loop_start()
    self.mqtt_client.publish("test", "Client %s now online" % mqtt_client_id)
    self.mqtt_client.subscribe("model_control")

def subscribe(self, topic):
    self.mqtt_client.subscribe(topic)


 def parse_serial(self):
    s = self.serial
    while 42:
        c = s.read() # read one byte
        if c == b'\xa5':
            # Start byte for data input detected, decode the following
            print("Start byte detected, parsing measure")

            result_data = {} # The data object to be sent
            num_sensors = int.from_bytes(s.read(1),byteorder='little', signed=True)
            s.read(2) #number of Byte
            model_id = int.from_bytes(s.read(1),byteorder='little', signed=True)
            model_name = self.config["models"]["%d"%model_id]["modelName"]
            s.read(1)
            #package counter
            s.read(2)
            timestamp=int.from_bytes(s.read(4),byteorder='little', signed=True)
            result_data["timestamp"] = timestamp
            #reserved
            s.read(4)
            # iterate over sensors, each sensor may contain multiple data values
            for i in range(num_sensors):
                num_bytes_per_value = s.read(1)
                num_bytes_per_value_int = int.from_bytes(num_bytes_per_value, byteorder='little', signed=True)
                sensor_type_b = s.read(1)
                sensor_type_int = int.from_bytes(sensor_type_b, byteorder='little', signed=True)
                sensor_type = "%d" % sensor_type_int
                byte_array_crc =b'\x00'

                #get the sensor id as string from config, if not then skip this packet
                if "%d" % sensor_type_int in self.config["sensors"]:
                    sensor_data = {}
                    sensor_name = self.config["sensors"][sensor_type]["sensorName"]

                    # Iterate over the sensor axis dimensions:
                    for axis in self.config["sensors"][sensor_type]["dimensions"]:
                        value = int.from_bytes(s.read( axis["bytes"] ), byteorder='little', signed=True)
                        sensor_data[axis["name"]] = value

                    result_data[sensor_name] = sensor_data

                else:
                    s.read(num_bytes_per_value_int) 
                    print("unknown sensor:", "%d" % sensor_type_int)

            checksum = int.from_bytes(s.read(2),byteorder='little')
            print(checksum, sum(byte_array_crc))
            if(checksum==sum(byte_array_crc)):
                print("Checksum OK")

            dt = datetime.now()
            microseconds = time.time() 
            result_data["time_submitted"] = microseconds
            print(result_data)
            msgInfo = self.mqtt_client.publish("%s%s" % (model_name, self.modelid), json.dumps(result_data))
            print(msgInfo)
            self.clear_buffer()
        else:
            self.bytestring.append(c[0])
            bstr = self.bytestring.decode("ascii")
            if "OK" in bstr or "ok" in bstr:
                print("-> OK received")
                self.received_ok.set()
                self.serial_in_status = 0
                self.clear_buffer()
            elif "TIM" in bstr:
                print("-> TIM received")
                self.received_ok.set()
                self.serial_in_status = 1
                self.clear_buffer()
            elif len(bstr) > 50:
                print("-> SERIAL ERROR")
                print(bstr)
                self.clear_buffer()

  def execute_command_file(self, filename, p_callback=None, startline=0, **kwargs):
    self.read_command_file(filename)
    self.mqtt_wait = None # Clear wait object
    commands = self.commands
    if commands:
        for index, cmd in enumerate(commands[startline:]):
            print("Line: ", cmd)
            if cmd.startswith("(*"):
                # Comment line, skip it
                continue
            elif cmd.startswith("_"):
                if "_MQTT_" in cmd:
                    cmd = cmd.replace("_MQTT_", "").split(",")

                    if len(cmd) == 2:
                        print("MQTT Publish", cmd[0], cmd[1])
                        # Publish message
                        self.mqtt_client.publish(cmd[0], cmd[1])
                        self.mqtt_client.loop(1)
                    elif len(cmd) == 4:
                        self.subscribe(cmd[2])
                        self.mqtt_wait = {
                            "topic": cmd[2],
                            "message": cmd[3],
                            "startline": index+1+startline,
                        }
                        self.mqtt_client.publish(cmd[0], cmd[1])
                        self.mqtt_client.loop(1)
                        print("Quit execution to wait for response")
                        print(self.mqtt_wait)
                        return # quit back
            else:
                if cmd.startswith("P"):
                    if p_callback:
                        cmd = p_callback(cmd, kwargs)
                    else:
                        print("No callback defined for ", cmd)

                if cmd:
                    self.received_ok.clear()
                    self.serial_send(bytes(cmd + "\r", "ascii"))
                    self.received_ok.wait() 
                    status = self.serial_in_status
                    if status == 0:
                        pass
                    elif status == 1:
                        # Timeout error
                        self.mqtt_client.publish("model_control", "timeout")
                        return # Stop execution
                    else:
                        print("SERIAL ERROR")
        print("Command file executed")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...