Потоки в сложном приложении, которое использует Popen, Serial и MQTT - PullRequest
0 голосов
/ 11 сентября 2018

Я создаю модуль python3, который учитывает:

  1. Библиотека MQTT из paho.mqtt.client
  2. Чтение данных GPS и анализ с использованием pynmea2 and serial
  3. чтение конкретной информации из Popen and PIPE подпроцессов
  4. чтение / запись приложения в InfluxDB

структура

.
|-- myApp
|   |-- mqtt
|   |-- db
|   |-- gps
|   |-- n2k
|-- main.py

В mqtt я создалкласс-обёртка для mqtt.Client, где в вышеупомянутом main.py нужно использовать только:

  client = myMqtt()
  client.run() # <--- run() has `loop_forever()`

loop_forever() код от paho.mqtt

для gps и db Я подумываю о создании подобных оболочек для моего приложения.

Для n2k У меня есть простая оболочка, которая вызывает предустановленные двоичные файлы (actisense-serial и analyzer) на порт /dev/ttyUSB0 следующим образом:

class N2KParser:
    def __init__(self, parser=None):
        self.cfg = get_configuration('n2k') # this has all the configuration (/dev/ttyUSB0)

        self.actisense_process = Popen(['actisense-serial', '-r', self.cfg['port']], stdout=PIPE)
        self.analyzer_proc = Popen(['analyzer', '-json'], stdin=self.actisense_process.stdout,
                                    stdout=PIPE, stderr=PIPE)
    def read(self):
        logger.info('Reading NMEA2000 via Actisense-NGT1 Gateway')
        while self.analyzer_proc.poll() is None:
            n2k_data = self.analyzer_proc.stdout.readline().decode('utf-8')
            try:
                n2k_dict = json.loads(n2k_data)
            except Exception as e:
                raise e
                self.actisense_process.close()
                self.analyzer_proc.close()

    def close():
        logger.info('Closing the Parser for N2K')
        self.actisense_process.close()
        self.analyzer_proc.close()

В файле main.py, аналогичном mqtt, я добавляю экземпляр n2k после client.run()

from myApp.cloud.Client import CloudMqtt
from myApp.db.dbClient import influxInstance
from myApp.nmea2k.n2kClient import N2KParser
import time

if __name__ == '__main__':

    try:
        db = influxInstance()
    except Exception as e:
        raise e
    try:
        client = CloudMqtt()
        rc = client.run()
    except Exception as e:
        raise e
        db.close()
        client.loop_stop()
        client.disconnect()
    try:
        parser = N2KParser() # <--- this is not executed since blocked by `client.run()`
    except Exception as e:
        raise e
        parser.close()

Как в комментарии:

  • , поскольку метод run() использует loop_forever(), который использует многопоточность и заблокирован навсегда, поэтому N2KParser не создаетсяd звонил.

Каков оптимальный способ структурировать вызов таких подмодулей в моем myApp, чтобы они все работали параллельно, не блокируя друг друга?

PS

, поскольку я использую logging для всех подмодулей: здесь вывод на stdout

INFO:myApp.db.dbClient:Client instance to DB created INFO:myApp.cloud.Client:MQTT client instance created DEBUG:myApp.cloud.Client:Sending CONNECT (u1, p1, wr0, wq0, wf0, c0, k60) client_id=b'f628a6d8-188a-403e-90ef-da72ec1474b6'DEBUG:umg.cloud.Client:Received CONNACK (1, 0) DEBUG:myApp.cloud.Client:RC: 0 INFO:myApp.cloud.Client:connected to cloud DEBUG:myApp.cloud.Client:Sending PINGREQ DEBUG:myApp.cloud.Client:Received PINGRESP

...