Я создаю модуль python3
, который учитывает:
- Библиотека MQTT из
paho.mqtt.client
- Чтение данных GPS и анализ с использованием
pynmea2 and serial
- чтение конкретной информации из
Popen and PIPE
подпроцессов - чтение / запись приложения в
InfluxDB
структура
.
|-- myApp
| |-- mqtt
| |-- db
| |-- gps
| |-- n2k
|-- main.py
В mqtt
я создалкласс-обёртка для mqtt.Client
, где в вышеупомянутом main.py
нужно использовать только:
client = myMqtt()
client.run() # <--- run() has `loop_forever()`
loop_forever()
код от paho.mqtt
для gps
и db
Я подумываю о создании подобных оболочек для моего приложения.
Для n2k
У меня есть простая оболочка, которая вызывает предустановленные двоичные файлы (actisense-serial
и analyzer
) на порт /dev/ttyUSB0
следующим образом:
class N2KParser:
def __init__(self, parser=None):
self.cfg = get_configuration('n2k') # this has all the configuration (/dev/ttyUSB0)
self.actisense_process = Popen(['actisense-serial', '-r', self.cfg['port']], stdout=PIPE)
self.analyzer_proc = Popen(['analyzer', '-json'], stdin=self.actisense_process.stdout,
stdout=PIPE, stderr=PIPE)
def read(self):
logger.info('Reading NMEA2000 via Actisense-NGT1 Gateway')
while self.analyzer_proc.poll() is None:
n2k_data = self.analyzer_proc.stdout.readline().decode('utf-8')
try:
n2k_dict = json.loads(n2k_data)
except Exception as e:
raise e
self.actisense_process.close()
self.analyzer_proc.close()
def close():
logger.info('Closing the Parser for N2K')
self.actisense_process.close()
self.analyzer_proc.close()
В файле main.py
, аналогичном mqtt
, я добавляю экземпляр n2k
после client.run()
from myApp.cloud.Client import CloudMqtt
from myApp.db.dbClient import influxInstance
from myApp.nmea2k.n2kClient import N2KParser
import time
if __name__ == '__main__':
try:
db = influxInstance()
except Exception as e:
raise e
try:
client = CloudMqtt()
rc = client.run()
except Exception as e:
raise e
db.close()
client.loop_stop()
client.disconnect()
try:
parser = N2KParser() # <--- this is not executed since blocked by `client.run()`
except Exception as e:
raise e
parser.close()
Как в комментарии:
- , поскольку метод
run()
использует loop_forever()
, который использует многопоточность и заблокирован навсегда, поэтому N2KParser
не создаетсяd звонил.
Каков оптимальный способ структурировать вызов таких подмодулей в моем myApp
, чтобы они все работали параллельно, не блокируя друг друга?
PS
, поскольку я использую logging
для всех подмодулей: здесь вывод на stdout
INFO:myApp.db.dbClient:Client instance to DB created INFO:myApp.cloud.Client:MQTT client instance created DEBUG:myApp.cloud.Client:Sending CONNECT (u1, p1, wr0, wq0, wf0, c0, k60) client_id=b'f628a6d8-188a-403e-90ef-da72ec1474b6'DEBUG:umg.cloud.Client:Received CONNACK (1, 0) DEBUG:myApp.cloud.Client:RC: 0 INFO:myApp.cloud.Client:connected to cloud DEBUG:myApp.cloud.Client:Sending PINGREQ DEBUG:myApp.cloud.Client:Received PINGRESP