У меня есть код Python для потоковой передачи данных с помощью MQTT Paho.Я хотел бы видеть данные в режиме реального времени на консоли.К сожалению, я получаю результаты только в конце процесса потоковой передачи.Я сейчас не знаю, что я делаю не так.Я использую функцию loop ()
Процесс кода выглядит следующим образом:
Я создаю соединение и указываю порт mqq, имя пользователя, последовательный порт и т. Д. Затем я читаю координатыданные для фрезерования (у меня фрезерный станок).Я помещаю результаты процесса в файл json.
Может кто-то увидеть, что я делаю неправильно?
мой класс машины
class Model_Machine(machine_Connector):
def __init__(self):
machine_Connector.__init__(self)
self.cmd_file_init = self.base_path + "/files/machine_init.txt"
self.cmd_file = self.base_path + "/files/machine.txt"
def mqtt_on_message(self, client, userdata, message):
if message.topic == "model_control":
print(message.payload.decode("utf-8"))
if message.payload.decode("utf-8") == "machine_start":
self.start_machine()
elif message.payload.decode("utf-8") == "machine_start_%s" % self.modelid:
self.start_machine()
elif message.payload.decode("utf-8") == "machine_init":
self.reset()
elif message.payload.decode("utf-8") == "machine_init_%s" % self.modelid:
self.reset()
elif message.payload.decode("utf-8") == "global_init":
self.reset()
else:
print("Received (without doing): ", message.payload.decode("utf-8"))
def reset(self):
self.execute_command_file(self.cmd_file_init)
def start_machine(self):
print("Machine START")
self.serial_send(bytes("\r", "ascii"))
time.sleep(1)
# Clear serial buffer with OKs
self.serial.reset_input_buffer()
self.execute_command_file(self.cmd_file)
def run(self):
# Set the init config path now that the modelid is set by main
self.cmd_file_init = self.base_path + "/files/machine_init_%s.txt" % self.modelid
print("RUN")
self.mqtt_connect()
self.serial_connect()
time.sleep(2)
self.serial_send(bytes("\r", "ascii"))
time.sleep(2)
self.serial_send(bytes("\r", "ascii"))
time.sleep(2)
Event().wait()
Класс подключения немного больше, поэтому я положил некоторые фрагменты класса
def mqtt_connect(self):
mqtt_client_id = "Client_%s_%s_%d" % (self.modeltype.lower(), self.modelid, time.time())
self.mqtt_client = mqtt.Client(mqtt_client_id)
self.mqtt_client.on_connect = self.mqtt_on_connect
self.mqtt_client.on_message = self.mqtt_on_message
if self.mqtt_username is not None and self.mqtt_password is not None:
self.mqtt_client.username_pw_set(
username=self.mqtt_username,
password=self.mqtt_password
)
while self.mqtt_connected != True:
try:
print("Attempting MQTT connection with ", self.mqtt_address, self.mqtt_port, self.mqtt_keepalive)
self.mqtt_client.connect(self.mqtt_address, self.mqtt_port, self.mqtt_keepalive)
self.mqtt_connected = True
except:
print("Could not connect. Check network.")
time.sleep(5)
self.mqtt_client.loop_start()
self.mqtt_client.publish("test", "Client %s now online" % mqtt_client_id)
self.mqtt_client.subscribe("model_control")
def subscribe(self, topic):
self.mqtt_client.subscribe(topic)
def parse_serial(self):
s = self.serial
while 42:
c = s.read() # read one byte
if c == b'\xa5':
# Start byte for data input detected, decode the following
print("Start byte detected, parsing measure")
result_data = {} # The data object to be sent
num_sensors = int.from_bytes(s.read(1),byteorder='little', signed=True)
s.read(2) #number of Byte
model_id = int.from_bytes(s.read(1),byteorder='little', signed=True)
model_name = self.config["models"]["%d"%model_id]["modelName"]
s.read(1)
#package counter
s.read(2)
timestamp=int.from_bytes(s.read(4),byteorder='little', signed=True)
result_data["timestamp"] = timestamp
#reserved
s.read(4)
# iterate over sensors, each sensor may contain multiple data values
for i in range(num_sensors):
num_bytes_per_value = s.read(1)
num_bytes_per_value_int = int.from_bytes(num_bytes_per_value, byteorder='little', signed=True)
sensor_type_b = s.read(1)
sensor_type_int = int.from_bytes(sensor_type_b, byteorder='little', signed=True)
sensor_type = "%d" % sensor_type_int
byte_array_crc =b'\x00'
#get the sensor id as string from config, if not then skip this packet
if "%d" % sensor_type_int in self.config["sensors"]:
sensor_data = {}
sensor_name = self.config["sensors"][sensor_type]["sensorName"]
# Iterate over the sensor axis dimensions:
for axis in self.config["sensors"][sensor_type]["dimensions"]:
value = int.from_bytes(s.read( axis["bytes"] ), byteorder='little', signed=True)
sensor_data[axis["name"]] = value
result_data[sensor_name] = sensor_data
else:
s.read(num_bytes_per_value_int)
print("unknown sensor:", "%d" % sensor_type_int)
checksum = int.from_bytes(s.read(2),byteorder='little')
print(checksum, sum(byte_array_crc))
if(checksum==sum(byte_array_crc)):
print("Checksum OK")
dt = datetime.now()
microseconds = time.time()
result_data["time_submitted"] = microseconds
print(result_data)
msgInfo = self.mqtt_client.publish("%s%s" % (model_name, self.modelid), json.dumps(result_data))
print(msgInfo)
self.clear_buffer()
else:
self.bytestring.append(c[0])
bstr = self.bytestring.decode("ascii")
if "OK" in bstr or "ok" in bstr:
print("-> OK received")
self.received_ok.set()
self.serial_in_status = 0
self.clear_buffer()
elif "TIM" in bstr:
print("-> TIM received")
self.received_ok.set()
self.serial_in_status = 1
self.clear_buffer()
elif len(bstr) > 50:
print("-> SERIAL ERROR")
print(bstr)
self.clear_buffer()
def execute_command_file(self, filename, p_callback=None, startline=0, **kwargs):
self.read_command_file(filename)
self.mqtt_wait = None # Clear wait object
commands = self.commands
if commands:
for index, cmd in enumerate(commands[startline:]):
print("Line: ", cmd)
if cmd.startswith("(*"):
# Comment line, skip it
continue
elif cmd.startswith("_"):
if "_MQTT_" in cmd:
cmd = cmd.replace("_MQTT_", "").split(",")
if len(cmd) == 2:
print("MQTT Publish", cmd[0], cmd[1])
# Publish message
self.mqtt_client.publish(cmd[0], cmd[1])
self.mqtt_client.loop(1)
elif len(cmd) == 4:
self.subscribe(cmd[2])
self.mqtt_wait = {
"topic": cmd[2],
"message": cmd[3],
"startline": index+1+startline,
}
self.mqtt_client.publish(cmd[0], cmd[1])
self.mqtt_client.loop(1)
print("Quit execution to wait for response")
print(self.mqtt_wait)
return # quit back
else:
if cmd.startswith("P"):
if p_callback:
cmd = p_callback(cmd, kwargs)
else:
print("No callback defined for ", cmd)
if cmd:
self.received_ok.clear()
self.serial_send(bytes(cmd + "\r", "ascii"))
self.received_ok.wait()
status = self.serial_in_status
if status == 0:
pass
elif status == 1:
# Timeout error
self.mqtt_client.publish("model_control", "timeout")
return # Stop execution
else:
print("SERIAL ERROR")
print("Command file executed")