Приложение интенсивного последовательного порта ввода / вывода: портирование из потоков, проектирование на основе очередей в асинхронный режим (а-ля Twisted) - PullRequest
4 голосов
/ 18 января 2011

Итак, я работал над приложением для клиента, которое связывается с беспроводными устройствами через последовательный (RS-232) «Мастер».В настоящее время я написал ядро ​​приложения, используя многопоточность (ниже).На #python я заметил, что консенсус, по-видимому, заключается в том, чтобы НЕ использовать потоки и использовать возможности асинхронной связи Twisted.

Мне не удалось найти хороших примеров использования витой связи с асинхронным вводом-выводом через последовательный порт.Тем не менее, я нашел Дейва Петиколаса «Twisted Введение» (спасибо nosklo), через который я сейчас работаю, но он использует сокеты вместо последовательной связи (но концепция асинхронности определенно очень хорошо объясняется).

Как бы я перенес это приложение на Twisted с помощью Threading, Queues?Есть ли какие-либо преимущества / недостатки (я заметил, что в случае зависания потока он будет BSOD системы)?

Код (msg_poller.py)

from livedatafeed import LiveDataFeed
from msg_build import build_message_to_send
from utils import get_item_from_queue
from protocol_wrapper import ProtocolWrapper, ProtocolStatus
from crc16 import *
import time
import Queue
import threading
import serial
import gc

gc.enable()
PROTOCOL_HEADER = '\x01'
PROTOCOL_FOOTER = '\x0D\x0A'
PROTOCOL_DLE = '\x90'

INITIAL_MODBUS = 0xFFFF


class Poller:
    """
    Connects to the serial port and polls nodes for data.
    Reads response from node(s) and loads that data into queue.
    Parses qdata and writes that data to database.
    """

    def __init__(self,
            port,
            baudrate,
            parity,
            rtscts,
            xonxoff,
            echo=False):
        try:
            self.serial = serial.serial_for_url(port,
                    baudrate,
                    parity=parity,
                    rtscts=rtscts,
                    xonxoff=xonxoff,
                    timeout=.01)
        except AttributeError:
            self.serial = serial.Serial(port,
                    baudrate,
                    parity=parity,
                    rtscts=rtscts,
                    xonxoff=xonxoff,
                    timeout=.01)
            self.com_data_q = None
        self.com_error_q = None
        self.livefeed = LiveDataFeed()
        self.timer = time.time()
        self.dtr_state = True
        self.rts_state = True
        self.break_state = False

    def start(self):
        self.data_q = Queue.Queue()
        self.error_q = Queue.Queue()
        com_error = get_item_from_queue(self.error_q)
        if com_error is not None:
            print 'Error %s' % (com_error)
        self.timer = time.time()
        self.alive = True

        # start monitor thread
        #
        self.mon_thread = threading.Thread(target=self.reader)
        self.mon_thread.setDaemon(1)
        self.mon_thread.start()

        # start sending thread
        #
        self.trans_thread = threading.Thread(target=self.writer)
        self.trans_thread.setDaemon(1)
        self.trans_thread.start()

    def stop(self):
        try:
            self.alive = False
            self.serial.close()
        except (KeyboardInterrupt, SystemExit):
            self.alive = False

    def reader(self):
        """
        Reads data from the serial port using self.mon_thread.
        Displays that data on the screen.
        """
        from rmsg_format import message_crc, message_format
        while self.alive:
            try:
                while self.serial.inWaiting() != 0:

                # Read node data from the serial port. Data should be 96B.

                    data = self.serial.read(96)
                    data += self.serial.read(self.serial.inWaiting())

                    if len(data) > 0:

                        # Put data in to the data_q object
                        self.data_q.put(data)
                        if len(data) == 96:
                            msg = self.data_q.get()

                            pw = ProtocolWrapper(
                                        header=PROTOCOL_HEADER,
                                        footer=PROTOCOL_FOOTER,
                                        dle=PROTOCOL_DLE)
                            status = map(pw.input, msg)

                            if status[-1] == ProtocolStatus.IN_MSG:
                                # Feed all the bytes of 'msg' sequentially into pw.input

                                # Parse the received CRC into a 16-bit integer
                                rec_crc = message_crc.parse(msg[-4:]).crc

                                # Compute the CRC on the message
                                calc_crc = calcString(msg[:-4], INITIAL_MODBUS)
                                from datetime import datetime
                                ts = datetime.now().strftime('%Y/%m/%d %H:%M:%S')
                                if rec_crc != calc_crc:
                                    print ts
                                    print 'ERROR: CRC Mismatch'
                                    print msg.encode('hex')
                                else:
                                    #msg = message_format.parse(msg[1:])
                                    #print msg.encode('hex') + "\r\n"
                                    msg = message_format.parse(msg[1:])
                                    print msg
                                    #return msg
                                    gc.collect()
                    time.sleep(.2)
            except (KeyboardInterrupt, SystemExit, Exception, TypeError):
                self.alive = False
                self.serial.close()
                raise

    def writer(self):
        """
        Builds the packet to poll each node for data.
        Writes that data to the serial port using self.trans_thread
        """
        import time
        try:
            while self.alive:
                try:
                    dest_module_code = ['DRILLRIG',
                            'POWERPLANT',
                            'GENSET',
                            'MUDPUMP']
                    dest_ser_no = lambda x: x + 1
                    for code in dest_module_code:
                        if code != 'POWERPLANT':
                            msg = build_message_to_send(
                                    data_len=0x10,
                                    dest_module_code='%s' % (code),
                                    dest_ser_no=dest_ser_no(0),
                                    dest_customer_code='*****',
                                    ret_ser_no=0x01,
                                    ret_module_code='DOGHOUSE',
                                    ret_customer_code='*****',
                                    command='POLL_NODE',
                                    data=[])
                            self.serial.write(msg)
                            time.sleep(.2)
                            gc.collect()
                        elif code == 'POWERPLANT':
                            msg = build_message_to_send(
                                    data_len=0x10,
                                    dest_module_code='POWERPLANT',
                                    dest_ser_no=dest_ser_no(0),
                                    dest_customer_code='*****',
                                    ret_ser_no=0x01,
                                    ret_module_code='DOGHOUSE',
                                    ret_customer_code='*****',
                                    command='POLL_NODE',
                                    data=[])
                            self.serial.write(msg)
                            time.sleep(.2)
                            gc.collect()
                            msg = build_message_to_send(
                                    data_len=0x10,
                                    dest_module_code='POWERPLANT',
                                    dest_ser_no=dest_ser_no(1),
                                    dest_customer_code='*****',
                                    ret_ser_no=0x01,
                                    ret_module_code='DOGHOUSE',
                                    ret_customer_code='*****',
                                    command='POLL_NODE',
                                    data=[])
                            self.serial.write(msg)
                            time.sleep(.2)
                            gc.collect()
                except (KeyboardInterrupt, SystemExit):
                    self.alive = False
                    self.serial.close()
                    raise
        except (KeyboardInterrupt, SystemExit):
            self.alive = False
            self.serial.close()
            raise


def main():
    poller = Poller(
            port='COM4',
            baudrate=115200,
            parity=serial.PARITY_NONE,
            rtscts=0,
            xonxoff=0,
            )
    poller.start()
    poller.reader()
    poller.writer()
    poller.stop()
if __name__ == '__main__':
    main()                                                                      

1 Ответ

7 голосов
/ 18 января 2011

Очень сложно (если не невозможно) написать прямую программу отображения один-к-одному между подходом потоков / очереди и той, которая использует витой.

Я бы предложил, чтобы получить представление о витой и ее реакторной манере, как использовать протокол и методы, специфичные для протокола. Подумайте об этом, поскольку все асинхронные вещи, которые вы явно кодировали с использованием потоков и очередей, предоставляются вам бесплатно, когда вы используете отложенное использование витой.

Twisted, похоже, поддерживает SerialPort через свой реактор, используя транспортный класс SerialPort, и базовая структура выглядит примерно так.

from twisted.internet import reactor
from twisted.internet.serialport import SerialPort

SerialPort(YourProtocolClass(), Port, reactor, baudrate=baudrate))
reactor.run() 

В YourProtocolClass () вы будете обрабатывать различные события, которые соответствуют вашим требованиям к последовательному порту. Каталог doc / core / examples содержит примеры, такие как gpsfix.py и mouse.py.

...