Создать две одновременно асинхронные задачи с асинхронным - PullRequest
0 голосов
/ 07 сентября 2018

Мне нужно создать программное обеспечение, которое получает параллельно от веб-сокета и канала и отправляет сообщения по другому каналу (получает из сокета, создает новый поток и отправляет в канал. Таким же образом получает из канал, создает новый поток и отправляет в сокет).

У меня проблема с многопоточностью, при загрузке программы мне нужно запустить методы socket_receiver и pipe_receiver, но я могу только запустить pipe_receiver. Я попытался удалить весь код и оставить только socket_receiver и pipe_receiver, но он входит только в while True из pipe_receiver.

import asyncio
import sys
import json
from concurrent.futures.thread import ThreadPoolExecutor
import websockets

# make the Pool of workers
executor = ThreadPoolExecutor(max_workers=10)
# Make connection to socket and pipe
header = {"Authorization": r"Basic XXXX="}
connection = websockets.connect('wss://XXXXXXXX', extra_headers=header)


async def socket_receiver():
    """Listening from web socket"""
    async with connection as web_socket:
        while True:
            message = await web_socket.recv()
            # send the message to the pipe in a new thread
            executor.submit(send_to_pipe(message))


async def pipe_receiver():
    """Listening from pipe"""
    while True:
        message = sys.stdin.readline()
        if not message:
            break
        executor.submit(send_to_socket(message))
        # jsonValue = json.dump(str(line), file);
        sys.stdout.flush()


def send_to_pipe(message):
    # Check if message is CAM or DENM
    json_message = json.loads(message)
    type = int(json_message["header"]["messageID"])
    # 1 is DENM message, 2 is CAM message
    if type == 1  or type == 2:
        # send the message to the pipe
        sys.stdout.print(json_message);


async def send_to_socket(message):
     async with connection as web_socket:
        json_message = json.dumps(message)
        await web_socket.send(json_message)


asyncio.get_event_loop().run_until_complete(
    asyncio.gather(socket_receiver(),pipe_receiver()))

Эта программа вызывается подпроцессом, родительский процесс связывается с ним через каналы, подключенные к stdout и stdin.

ОБНОВЛЕНИЕ: я получаю это исключение с кодом @Martijn Pieters

Traceback (most recent call last):
  File "X", line 121, in <module>
    main()
  File "X", line 119, in main
    loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro))
  File "X\AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 568, in run_until_complete
    return future.result()
  File "X", line 92, in connect_pipe
    reader, writer = await stdio()
  File "X", line 53, in stdio
    lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
  File "X/AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 1421, in connect_read_pipe
    transport = self._make_read_pipe_transport(pipe, protocol, waiter)
  File "X/AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 433, in _make_read_pipe_transport
    raise NotImplementedError
NotImplementedError

1 Ответ

0 голосов
/ 07 сентября 2018

Вы не используете ThreadPoolExecutor правильно, и вы действительно не хотите использовать это здесь. Вместо этого вам нужно настроить потребителей и производителей для обработки вашего сокета и канала с очередями для отправки сообщений между ними.

  • для каждого типа соединения создайте сопрограмму, которая создает соединение, а затем передаст это одиночное соединение как потребителю, так и производителю tasks (созданный с помощью asyncio.create_task()) для этой связи. Используйте asyncio.wait() для запуска обеих задач с помощью return_when=asyncio.FIRST_COMPLETED, так что вы можете отменить любую, которая все еще выполняется, когда один из двух завершает «раннее» (например, сбой).

  • Используйте очередь для передачи сообщений от одного получателя к производителю другого соединения.

  • sys.stdin и sys.stdout являются блокирующими потоками, не просто читайте и пишите в них! См. https://gist.github.com/nathan-hoad/8966377, где описана попытка установить неблокирующие потоки STDIO, и эта проблема асинхронного ввода , которая запрашивает функцию неблокирующих потоков.

  • Не используйте глобальное сокетное соединение, конечно же, не с двумя отдельными async with операторами. Ваш метод send_to_socket() на самом деле закроет сокет, потому что диспетчер контекста async with connection as web_socket: завершает работу при отправке первого сообщения, и это вызывает проблемы для кода socket_receiver, который предполагает, что сокет остается открытым бесконечно долго.

  • Не используйте многопоточность здесь! Ваши соединения полностью управляются asyncio, поэтому многопоточность в этом случае сильно помешает.

  • asyncio.Executor() экземпляры должны использоваться только с обычными вызовами, не с сопрограммами. Executor.submit() заявляет, что требуется вызов, передача сопрограммы с executor.submit(send_to_pipe(message)) или executor.submit(send_to_socket(message)) вызовет исключение, так как сопрограммы не могут быть вызваны. Вероятно, вы не видите сообщение об исключении, поскольку это исключение возникает в другом потоке.

    Это причина, по которой ваш socket_receiver() сопрограмма не работает; конечно, запускается , но попытки отправить сообщение не удаются. Когда я запускаю ваш код на локальном макетированном сервере веб-сокетов, выдается предупреждение:

    RuntimeWarning: coroutine 'send_to_socket' was never awaited
      executor.submit(send_to_socket(message))
    

    Когда сопрограмма не ожидается, код в этой сопрограмме никогда не выполняется. Оборачивая сопрограмму в ту, которая печатает исключение для stderr (try: callable(), except Exception: traceback.print_exc(file=sys.stderr))), вы получаете:

    Traceback (most recent call last):
      File "soq52219672.py", line 15, in log_exception
        callable()
    TypeError: 'coroutine' object is not callable
    

Исполнители должны использоваться только для интеграции кода, который не может быть преобразован в использование сопрограмм; исполнитель управляет этим кодом для выполнения параллельно задачам asyncio без помех. Следует соблюдать осторожность, если этот код хочет взаимодействовать с asyncio задачами, всегда используйте asyncio.run_coroutine_threadsafe() или asyncio.call_soon_threadsafe() для вызова через границу. См. Параллелизм и многопоточность раздел .

Вот пример того, как я переписал бы ваш код для использования шаблона потребитель / производитель, с stdio(), основанным на грифе Натана Хоада на тему , плюс запасной вариант для Windows, где поддержка обработки stdio как каналов ограничена :

import asyncio
import json
import os
import sys

import websockets

async def socket_consumer(socket, outgoing):
    # take messages from the web socket and push them into the queue
    async for message in socket:
        await outgoing.put(message)

async def socket_producer(socket, incoming):
    # take messages from the queue and send them to the socket
    while True:
        message = await incoming.get()
        jsonmessage = json.dumps(message)
        await socket.send(jsonmessage)

async def connect_socket(incoming, outgoing):
    header = {"Authorization": r"Basic XXXX="}
    uri = 'wss://XXXXXXXX'
    async with websockets.connect(uri, extra_headers=header) as websocket:
        # create tasks for the consumer and producer. The asyncio loop will
        # manage these independently
        consumer_task = asyncio.create_task(socket_consumer(websocket, outgoing))
        producer_task = asyncio.create_task(socket_producer(websocket, incoming))

        # start both tasks, but have the loop return to us when one of them
        # has ended. We can then cancel the remainder
        done, pending = await asyncio.wait(
            [consumer_task, producer_task],
            return_when=asyncio.FIRST_COMPLETED
        )
        for task in pending:
            task.cancel()
        # force a result check; if there was an exception it'll be re-raised
        for task in done:
            task.result()


# pipe support
async def stdio(loop=None):
    if loop is None:
        loop = asyncio.get_running_loop()

    if sys.platform == 'win32':
        # no support for asyncio stdio yet on Windows, see https://bugs.python.org/issue26832
        # use an executor to read from stdio and write to stdout
        class Win32StdinReader:
            def __init__(self):
                self.stdin = sys.stdin.buffer 
            async def readline():
                # a single call to sys.stdin.readline() is thread-safe
                return await loop.run_in_executor(None, self.stdin.readline)

        class Win32StdoutWriter:
            def __init__(self):
                self.buffer = []
                self.stdout = sys.stdout.buffer
            def write(self, data):
                self.buffer.append(data)
            async def drain(self):
                data, self.buffer = self.buffer, []
                # a single call to sys.stdout.writelines() is thread-safe
                return await loop.run_in_executor(None, sys.stdout.writelines, data)

        return Win32StdinReader(), Win32StdoutWriter()

    reader = asyncio.StreamReader()
    await loop.connect_read_pipe(
        lambda: asyncio.StreamReaderProtocol(reader),
        sys.stdin
    )

    writer_transport, writer_protocol = await loop.connect_write_pipe(
        asyncio.streams.FlowControlMixin,
        os.fdopen(sys.stdout.fileno(), 'wb')
    )
    writer = asyncio.streams.StreamWriter(writer_transport, writer_protocol, None, loop)

    return reader, writer

async def pipe_consumer(pipereader, outgoing):
    # take messages from the pipe and push them into the queue
    while True:
        message = await pipereader.readline()
        if not message:
            break
        await outgoing.put(message.decode('utf8'))

async def pipe_producer(pipewriter, incoming):
    # take messages from the queue and send them to the pipe
    while True:
        jsonmessage = await incoming.get()
        message = json.loads(jsonmessage)
        type = int(message.get('header', {}).get('messageID', -1))
        # 1 is DENM message, 2 is CAM message
        if type in {1, 2}:
            pipewriter.write(jsonmessage.encode('utf8') + b'\n')
            await pipewriter.drain()

async def connect_pipe(incoming, outgoing):
    reader, writer = await stdio()
    # create tasks for the consumer and producer. The asyncio loop will
    # manage these independently
    consumer_task = asyncio.create_task(pipe_consumer(reader, outgoing))
    producer_task = asyncio.create_task(pipe_producer(writer, incoming))

    # start both tasks, but have the loop return to us when one of them
    # has ended. We can then cancel the remainder
    done, pending = await asyncio.wait(
        [consumer_task, producer_task],
        return_when=asyncio.FIRST_COMPLETED
    )
    for task in pending:
        task.cancel()
    # force a result check; if there was an exception it'll be re-raised
    for task in done:
        task.result()

async def main():
    pipe_to_socket = asyncio.Queue()
    socket_to_pipe = asyncio.Queue()

    socket_coro = connect_socket(pipe_to_socket, socket_to_pipe)
    pipe_coro = connect_pipe(socket_to_pipe, pipe_to_socket)

    await asyncio.gather(socket_coro, pipe_coro)

if __name__ == '__main__':
    asyncio.run(main())

Затем начинается две задачи: одна для управления сокетом, другая для управления каналом STDIO. Каждый из них запускает еще 2 задания для своего потребителя и производителя. Есть две очереди для отправки сообщений от потребителя одного и производителю другого.

...