Python Asyncio queue get не получает сообщение - PullRequest
0 голосов
/ 12 сентября 2018

Я публикую новый вопрос, связанный с old , из-за проблемы с очередью get from. Это код (спасибо Martijn Pieters)

import asyncio
import sys
import json
import os
import websockets


async def socket_consumer(socket, outgoing):
    # take messages from the web socket and push them into the queue
    async for message in socket:
        await outgoing.put(message)
        file = open(r"/home/host/Desktop/FromSocket.txt", "a")
        file.write("From socket: " + ascii(message) + "\n")
        file.close()


async def socket_producer(socket, incoming):
    # take messages from the queue and send them to the socket
    while True:
        message = await incoming.get()
        file = open(r"/home/host/Desktop/ToSocket.txt", "a")
        file.write("To socket: " + ascii(message) + "\n")
        file.close()
        await socket.send(message)


async def connect_socket(incoming, outgoing, loop=None):
    header = {"Authorization": r"Basic XXX="}
    uri = 'XXXXXX'
    async with websockets.connect(uri, extra_headers=header) as web_socket:
        # create tasks for the consumer and producer. The asyncio loop will
        # manage these independently
        consumer_task = asyncio.ensure_future(
            socket_consumer(web_socket, outgoing), loop=loop)
        producer_task = asyncio.ensure_future(
            socket_producer(web_socket, incoming), loop=loop)

        # start both tasks, but have the loop return to us when one of them
        # has ended. We can then cancel the remainder
        done, pending = await asyncio.wait(
            [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
        for task in pending:
            task.cancel()


# pipe support
async def stdio(loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    reader = asyncio.StreamReader()
    await loop.connect_read_pipe(
        lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)

    writer_transport, writer_protocol = await loop.connect_write_pipe(
        asyncio.streams.FlowControlMixin, os.fdopen(sys.stdout.fileno(), 'wb'))
    writer = asyncio.streams.StreamWriter(
        writer_transport, writer_protocol, None, loop)

    return reader, writer


async def pipe_consumer(pipe_reader, outgoing):
    # take messages from the pipe and push them into the queue
    while True:
        message = await pipe_reader.readline()
        if not message:
            break
        file = open(r"/home/host/Desktop/FromPipe.txt", "a")
        file.write("From pipe: " + ascii(message.decode('utf8')) + "\n")
        file.close()

        await outgoing.put(message.decode('utf8'))


async def pipe_producer(pipe_writer, incoming):
    # take messages from the queue and send them to the pipe
    while True:
        json_message = await incoming.get()
        file = open(r"/home/host/Desktop/ToPipe.txt", "a")
        file.write("Send to pipe message: " + ascii(json_message) + "\n")
        file.close()
        try:
            message = json.loads(json_message)
            message_type = int(message.get('header', {}).get('messageID', -1))

        except (ValueError, TypeError, AttributeError):
            # failed to decode the message, or the message was not
            # a dictionary, or the messageID was convertable to an integer
            message_type = None
            file = open(r"/home/host/Desktop/Error.txt", "a")
            file.write(" Error \n")
            file.close()
        # 1 is DENM message, 2 is CAM message
        file.write("Send to pipe type: " + type)
        if message_type in {1, 2}:
            file.write("Send to pipe: " + json_message)
            pipe_writer.write(json_message.encode('utf8') + b'\n')
            await pipe_writer.drain()


async def connect_pipe(incoming, outgoing, loop=None):
    reader, writer = await stdio()
    # create tasks for the consumer and producer. The asyncio loop will
    # manage these independently
    consumer_task = asyncio.ensure_future(
        pipe_consumer(reader, outgoing), loop=loop)
    producer_task = asyncio.ensure_future(
        pipe_producer(writer, incoming), loop=loop)

    # start both tasks, but have the loop return to us when one of them
    # has ended. We can then cancel the remainder
    done, pending = await asyncio.wait(
        [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
    for task in pending:
        task.cancel()
    # force a result check; if there was an exception it'll be re-raised
    for task in done:
        task.result()


def main():
    loop = asyncio.get_event_loop()
    pipe_to_socket = asyncio.Queue(loop=loop)
    socket_to_pipe = asyncio.Queue(loop=loop)

    socket_coro = connect_socket(pipe_to_socket, socket_to_pipe, loop=loop)
    pipe_coro = connect_pipe(socket_to_pipe, pipe_to_socket, loop=loop)

    loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro))

main()

Этот код является дочерним процессом, вызываемым из родительского элемента

subprocess.Popen(["python3", test], stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=2048)

Проблема в том, что объект находится в очереди на socket_consumer (получен из сокета), но pipe_producer не идет вперед от incoming.get(). Запись в файл предназначена только для тестирования.

Родитель на данный момент это (только для теста)

test = r"/home/host/PycharmProjects/Tim/Tim.py"
process = subprocess.Popen(["python3", test],
                           stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=2048)

for i in range(5):
    message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}, the rest of json...}}';
    jsonValueBytes = message.encode("utf-8")
    process.stdin.write(jsonValueBytes + b"\n")

process.stdin.close()
process.wait()

Вместо отправки в веб-сокет я использую этот код:

#!/usr/bin/env python

import asyncio
import websockets

async def hello(uri):
    header = {"Authorization": r"Basic XXXX="}
    message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400},"cam":{"generationDeltaTime":1,"camParameters":{"basicContainer":{"stationType":5,"referencePosition":{"latitude":451114425,"longitude":76720957,"positionConfidenceEllipse":{"semiMajorConfidence":4095,"semiMinorConfidence":4095,"semiMajorOrientation":3601},...other fields}}';
    async with websockets.connect(uri, extra_headers=header) as websocket:
        await websocket.send(message)


asyncio.get_event_loop().run_until_complete(
    hello('XXX'))

Он отправляет через канал и работает, потому что я получаю по каналу и отправляю в сокет (файлы FromPipe.txt. И ToSocket.txt верны).
Затем у меня есть код для отправки на сервер с открытым веб-сокетом, и этот сервер отправляет сообщение ребенку. Когда ребенок получает из сокета файл FromSocket.txt, он создается, но файл ToPipe.txt не создается, пока я не поставлю его перед awit incoming.get()

.

FromSocket.txt имеет это содержание:

From socket: '{"header":{"protocolVersion":1,"messageID":2,"stationID":400},"cam":{"generationDeltaTime":1, ... other field}}'

Но если при извлечении типа возникнет проблема, он создаст файл, поскольку это первая инструкция после json_message = await incoming.get() Я думаю, это проблема с очередью. Для теста я положил incoming.get() в socket_consumer после ожидания outgoing.put(message), и он работает.

ОБНОВЛЕНИЕ: Если я запускаю только дочерний элемент (то есть без канала), то файл ToPipe.txt является правильным, и передача сообщения из сокета в канал - это нормально. Для моего теста я запускаю родителя, он отправляет в конвейер одно сообщение, которое потомок отправляет сокету, затем я отправляю сообщение в сокет, и потомок ловит это сообщение, но не отправляет в канал и ToPipe.txt. не создан. Может быть, есть проблема в основном методе

1 Ответ

0 голосов
/ 12 сентября 2018

Вы записываете JSON с двойным кодированием в дочерний процесс:

message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}, the rest of json...}}';
jsonValue = json.dumps(message)

message уже является строкой JSON, поэтому jsonValue является строкой JSON с двойным кодированием.

Потребитель канала помещает эту строку с двойным кодированием в очередь для сокета.Затем производитель веб-сокетов в socket_producer() снова кодирует сообщение :

while True:
    message = await incoming.get()
    # ...
    json_message = json.dumps(message)
    await socket.send(json_message)

Так что теперь json_message представляет собой значение JSON с тройным кодированием, документ JSON, содержащий документ JSON, содержащийдокумент JSON:

>>> import json
>>> message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}}'  # valid JSON
>>> json_message = json.dumps(message)
>>> print(json_message)  # double-encoded
"{\"header\":{\"protocolVersion\":1,\"messageID\":2,\"stationID\":400}}}"
>>> json_message = json.dumps(json_message)  # encode *again*
>>> print(json_message)  # triple-encoded
"\"{\\\"header\\\":{\\\"protocolVersion\\\":1,\\\"messageID\\\":2,\\\"stationID\\\":400}}}\""

Я не знаю точно, что делает с этим ваш веб-сокет, но давайте предположим, что он использует json.loads() один раз, а затем эхом отдает декодированное сообщение.Это означает, что socket_consumer() получает документ JSON, который закодирован только дважды.Ваш журнал FromSocket.txt определенно подразумевает, что именно так и происходит, потому что он содержит JSON-сообщение с кодировкой double :

Это можно увидеть в журнале FromSocket.txt:

From socket: "{\"header\":{\"protocolVersion\":1,\"messageID\":2,\"stationID\":400},\"cam\":{\"generationDeltaTime\":1,...other fields}}"

Обратите внимание на эти \" записи, и весь документ обернут в кавычки, но в этом значении нет \\\ тройной обратной косой черты.

Тем не менее этот дополнительный уровень кодирования JSON нарушаетpipe_producer() сопрограмма, которая ожидает, что сообщение будет декодировано в словарь, а не в другую строку (даже если эта строка содержит другой документ JSON):

message = json.loads(json_message)
type = int(message.get('header', {}).get('messageID', -1))

message будет декодировать вместо строки, поэтому message.get завершится с ошибкой AttributeError, что приведет к выходу сопрограммы:

>>> json_message = "{\"header\":{\"protocolVersion\":1,\"messageID\":2,\"stationID\":400}}}"  # double encoded
>>> message = json.loads(json_message)
>>> message  # Back one stop, single-encoded JSON
'{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}}'
>>> type(message)  # it's a string with JSON, not a dictionary
<class 'str'>
>>> message.get('header')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'str' object has no attribute 'get'

Вам необходимо убедиться, что вы не кодируете свои данные слишком много раз!Если ваш канал получает данные JSON, не кодирует данные снова при отправке в сокет.При отправке данных в канал из родительского процесса не следует дважды кодировать данные , если у вас уже есть строка JSON, нет смысла передавать ее через json.dumps() еще раз.

Было бы также разумно добавить дополнительные функции безопасности в сопрограммы.Я не сделал JSON-декодирование достаточно надежным, поэтому давайте исправим эту часть:

async def pipe_producer(pipe_writer, incoming):
    # take messages from the queue and send them to the pipe
    while True:
        json_message = await incoming.get()
        try:
            message = json.loads(json_message)
            type = int(message.get('header', {}).get('messageID', -1))
        except (ValueError, TypeError, AttributeError):
            # failed to decode the message, or the message was not
            # a dictionary, or the messageID was convertable to an integer
            type = None
        # 1 is DENM message, 2 is CAM message
        if type in {1, 2}:
            pipe_writer.write(json_message.encode('utf8') + b'\n')
            await pipe_writer.drain()

Возможно, вы захотите записать, что где-то не удалось декодировать (отправлять сообщения в очередь журналов, которую отдельная задача получает дозапись в журнал).

Далее мы можем обновить функции connect_*, чтобы не игнорировать исключения в задачах, которые завершаются:

done, pending = await asyncio.wait(
    [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
for task in pending:
    task.cancel()
# force a result check; if there was an exception it'll be re-raised
for task in done:
    task.result()

Проверка done.result() может повторно вызватьисключение, брошенное в потребителя или производителя.Так как сопрограммы connect_* запускаются через asyncio.gather(), который, в свою очередь, запускается loop.run_until_complete(), то это исключение затем распространяется до функции main(), поэтому он выйдет из Python и вы увидитеобратная связь напечатана.Я обновил свой другой ответ, добавив в него цикл for task in done: task.result(), так как в любом случае это хорошая практика.

С просто циклом task.result() в моем исходном коде ответа и веб-сокетом, который простовозвращает сообщение и вводя действительный документ JSON (без двойного кодирования), я сразу вижу ошибку;родительский процесс здесь - мой терминал, поэтому я просто копирую сообщение JSON в окно моего терминала для отправки данных в канал:

$ python3.7 so52291218.py
{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
Traceback (most recent call last):
  File "so52291218.py", line 140, in <module>
    main()
  File "so52291218.py", line 137, in main
    loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro))
  File "/.../lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
    return future.result()
  File "so52291218.py", line 126, in connect_pipe
    task.result()
  File "so52291218.py", line 104, in pipe_producer
    type = int(message.get("header", {}).get("messageID", -1))
AttributeError: 'str' object has no attribute 'get'

Когда я удаляю вызов json.dumps() из socket_producer() или Я изменяю свой сервер веб-сокетов, чтобы использовать json.loads() для входящего сообщения и отправлять его как результат, затем код работает, и я получаю то же сообщение, возвращаемое обратно на мой терминал.

Обратите внимание, что вы не можете просто использовать цикл для записи в subprocess.Popen() трубу , когда оба stdin и stdout являются каналами.Вы можете легко заставить дочерний процесс зависать при вводе / выводе, только записывая в цикл.Вы также должны были бы читать из канала stdout, но поскольку дочерний процесс будет читать и писать из этих дескрипторов в фактически произвольном порядке, ваш родительский процесс также должен обрабатыватьввод / вывод для Popen() труб асинхронно.

Вместо того, чтобы писать, как это сделать (что уже описано в других разделах Stack Overflow), я вместо этого советую вам использовать pexpect проект , как это уже сделал все, что работает для вас (путем создания отдельного потока, который постоянно читает из канала stdout);использование pexpect.popen_spawn.PopenSpawn(), чтобы сохранить это значение близко к вашей первоначальной настройке, выглядело бы так:

import sys
import pexpect

test = '...'
process = pexpect.popen_spawn.PopenSpawn([sys.executable, test])

for i in range(5):
    message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}';
    jsonValueBytes = message.encode("utf-8")
    process.send(jsonValueBytes + b"\n")

    # echo anything coming back
    while True:
        index = process.expect([process.crlf, pexpect.EOF, pexpect.TIMEOUT], timeout=0.1)
        if not process.before:
            break
        print('>>>', process.before.decode('utf8', errors='replace'), flush=True)

# send EOF to close the pipe, then terminate the process
process.sendeof()
process.kill(1)
process.wait()

Таким образом, каждый раз, когда мы отправляем полную линию в канал, мы также ищем линии, идущие в другую сторону, скороткий тайм-аут и вывод любых подобных строк.

Со всеми исправлениями на месте (избегая многократного кодирования сообщений JSON) и очень простым эхо-сервером websocket , pexpectкод выше печатает:

>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}

Показывает, что существует полный путь туда и обратно от родительского процесса к дочернему процессу к веб-сокету и обратно.

...