Чтение из трубы с pexpect на Linux - PullRequest
       23

Чтение из трубы с pexpect на Linux

0 голосов
/ 17 сентября 2018

Я публикую новый вопрос, связанный с old , из-за проблемы с очередью get from.Это код (спасибо Martijn Pieters)

import asyncio
import sys
import json
import os
import websockets


async def socket_consumer(socket, outgoing):
    # take messages from the web socket and push them into the queue
    async for message in socket:
        await outgoing.put(message)
        file = open(r"/home/host/Desktop/FromSocket.txt", "a")
        file.write("From socket: " + ascii(message) + "\n")
        file.close()


async def socket_producer(socket, incoming):
    # take messages from the queue and send them to the socket
    while True:
        message = await incoming.get()
        file = open(r"/home/host/Desktop/ToSocket.txt", "a")
        file.write("To socket: " + ascii(message) + "\n")
        file.close()
        await socket.send(message)


async def connect_socket(incoming, outgoing, loop=None):
    header = {"Authorization": r"Basic XXX="}
    uri = 'XXXXXX'
    async with websockets.connect(uri, extra_headers=header) as web_socket:
        # create tasks for the consumer and producer. The asyncio loop will
        # manage these independently
        consumer_task = asyncio.ensure_future(
            socket_consumer(web_socket, outgoing), loop=loop)
        producer_task = asyncio.ensure_future(
            socket_producer(web_socket, incoming), loop=loop)

        # start both tasks, but have the loop return to us when one of them
        # has ended. We can then cancel the remainder
        done, pending = await asyncio.wait(
            [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
        for task in pending:
            task.cancel()


# pipe support
async def stdio(loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    reader = asyncio.StreamReader()
    await loop.connect_read_pipe(
        lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)

    writer_transport, writer_protocol = await loop.connect_write_pipe(
        asyncio.streams.FlowControlMixin, os.fdopen(sys.stdout.fileno(), 'wb'))
    writer = asyncio.streams.StreamWriter(
        writer_transport, writer_protocol, None, loop)

    return reader, writer


async def pipe_consumer(pipe_reader, outgoing):
    # take messages from the pipe and push them into the queue
    while True:
        message = await pipe_reader.readline()
        if not message:
            break
        file = open(r"/home/host/Desktop/FromPipe.txt", "a")
        file.write("From pipe: " + ascii(message.decode('utf8')) + "\n")
        file.close()

        await outgoing.put(message.decode('utf8'))


async def pipe_producer(pipe_writer, incoming):
    # take messages from the queue and send them to the pipe
    while True:
        json_message = await incoming.get()
        file = open(r"/home/host/Desktop/ToPipe.txt", "a")
        file.write("Send to pipe message: " + ascii(json_message) + "\n")
        file.close()
        try:
            message = json.loads(json_message)
            message_type = int(message.get('header', {}).get('messageID', -1))

        except (ValueError, TypeError, AttributeError):
            # failed to decode the message, or the message was not
            # a dictionary, or the messageID was convertable to an integer
            message_type = None
            file = open(r"/home/host/Desktop/Error.txt", "a")
            file.write(" Error \n")
            file.close()
        # 1 is DENM message, 2 is CAM message
        file.write("Send to pipe type: " + type)
        if message_type in {1, 2}:
            file.write("Send to pipe: " + json_message)
            pipe_writer.write(json_message.encode('utf8') + b'\n')
            await pipe_writer.drain()


async def connect_pipe(incoming, outgoing, loop=None):
    reader, writer = await stdio()
    # create tasks for the consumer and producer. The asyncio loop will
    # manage these independently
    consumer_task = asyncio.ensure_future(
        pipe_consumer(reader, outgoing), loop=loop)
    producer_task = asyncio.ensure_future(
        pipe_producer(writer, incoming), loop=loop)

    # start both tasks, but have the loop return to us when one of them
    # has ended. We can then cancel the remainder
    done, pending = await asyncio.wait(
        [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
    for task in pending:
        task.cancel()
    # force a result check; if there was an exception it'll be re-raised
    for task in done:
        task.result()


def main():
    loop = asyncio.get_event_loop()
    pipe_to_socket = asyncio.Queue(loop=loop)
    socket_to_pipe = asyncio.Queue(loop=loop)

    socket_coro = connect_socket(pipe_to_socket, socket_to_pipe, loop=loop)
    pipe_coro = connect_pipe(socket_to_pipe, pipe_to_socket, loop=loop)

    loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro))

main()

Для отправки в Pipe я использую этот код:

import pexpect


test = r"/home/host/PycharmProjects/Tim/Tim.py"
process = pexpect.spawn("python3 " + test)
message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400},"cam":{"generationDeltaTime":1,"camParameters":{"basicContainer":{"stationType":5}}';
process.write(message + "\n")
process.wait()

, но как я могу создать скрипт для чтения вместо записи?Я попытался с

test = r"/home/host/PycharmProjects/Tim/Tim.py"
p = pexpect.spawn("python3 " + test, timeout=None)
while True:
    m = p.read()
    file = open(r"/home/host/Desktop/OpeListening.txt", "a")
    file.write(str(m))
    file.close()
p.wait()

Но чтение сразу переходит к следующему шагу без каких-либо сообщений.В чем моя ошибка?

1 Ответ

0 голосов
/ 17 сентября 2018

На данный момент пользуюсь с успехом Popen

process = subprocess.Popen(['python3', test], shell=False, stdout=subprocess.PIPE,
                           stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
while True:
    result = process.stdout.readline()
    result = result.decode("utf-8")
    print(result)
proc.wait()
...