Вы не используете ThreadPoolExecutor
правильно, и вы действительно не хотите использовать это здесь. Вместо этого вам нужно настроить потребителей и производителей для обработки вашего сокета и канала с очередями для отправки сообщений между ними.
для каждого типа соединения создайте сопрограмму, которая создает соединение, а затем передаст это одиночное соединение как потребителю, так и производителю tasks (созданный с помощью asyncio.create_task()
) для этой связи. Используйте asyncio.wait()
для запуска обеих задач с помощью return_when=asyncio.FIRST_COMPLETED
, так что вы можете отменить любую, которая все еще выполняется, когда один из двух завершает «раннее» (например, сбой).
Используйте очередь для передачи сообщений от одного получателя к производителю другого соединения.
sys.stdin
и sys.stdout
являются блокирующими потоками, не просто читайте и пишите в них! См. https://gist.github.com/nathan-hoad/8966377, где описана попытка установить неблокирующие потоки STDIO, и эта проблема асинхронного ввода , которая запрашивает функцию неблокирующих потоков.
Не используйте глобальное сокетное соединение, конечно же, не с двумя отдельными async with
операторами. Ваш метод send_to_socket()
на самом деле закроет сокет, потому что диспетчер контекста async with connection as web_socket:
завершает работу при отправке первого сообщения, и это вызывает проблемы для кода socket_receiver
, который предполагает, что сокет остается открытым бесконечно долго.
Не используйте многопоточность здесь! Ваши соединения полностью управляются asyncio, поэтому многопоточность в этом случае сильно помешает.
asyncio.Executor()
экземпляры должны использоваться только с обычными вызовами, не с сопрограммами. Executor.submit()
заявляет, что требуется вызов, передача сопрограммы с executor.submit(send_to_pipe(message))
или executor.submit(send_to_socket(message))
вызовет исключение, так как сопрограммы не могут быть вызваны. Вероятно, вы не видите сообщение об исключении, поскольку это исключение возникает в другом потоке.
Это причина, по которой ваш socket_receiver()
сопрограмма не работает; конечно, запускается , но попытки отправить сообщение не удаются. Когда я запускаю ваш код на локальном макетированном сервере веб-сокетов, выдается предупреждение:
RuntimeWarning: coroutine 'send_to_socket' was never awaited
executor.submit(send_to_socket(message))
Когда сопрограмма не ожидается, код в этой сопрограмме никогда не выполняется. Оборачивая сопрограмму в ту, которая печатает исключение для stderr (try: callable(), except Exception: traceback.print_exc(file=sys.stderr))
), вы получаете:
Traceback (most recent call last):
File "soq52219672.py", line 15, in log_exception
callable()
TypeError: 'coroutine' object is not callable
Исполнители должны использоваться только для интеграции кода, который не может быть преобразован в использование сопрограмм; исполнитель управляет этим кодом для выполнения параллельно задачам asyncio
без помех. Следует соблюдать осторожность, если этот код хочет взаимодействовать с asyncio
задачами, всегда используйте asyncio.run_coroutine_threadsafe()
или asyncio.call_soon_threadsafe()
для вызова через границу. См. Параллелизм и многопоточность раздел .
Вот пример того, как я переписал бы ваш код для использования шаблона потребитель / производитель, с stdio()
, основанным на грифе Натана Хоада на тему , плюс запасной вариант для Windows, где поддержка обработки stdio как каналов ограничена :
import asyncio
import json
import os
import sys
import websockets
async def socket_consumer(socket, outgoing):
# take messages from the web socket and push them into the queue
async for message in socket:
await outgoing.put(message)
async def socket_producer(socket, incoming):
# take messages from the queue and send them to the socket
while True:
message = await incoming.get()
jsonmessage = json.dumps(message)
await socket.send(jsonmessage)
async def connect_socket(incoming, outgoing):
header = {"Authorization": r"Basic XXXX="}
uri = 'wss://XXXXXXXX'
async with websockets.connect(uri, extra_headers=header) as websocket:
# create tasks for the consumer and producer. The asyncio loop will
# manage these independently
consumer_task = asyncio.create_task(socket_consumer(websocket, outgoing))
producer_task = asyncio.create_task(socket_producer(websocket, incoming))
# start both tasks, but have the loop return to us when one of them
# has ended. We can then cancel the remainder
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED
)
for task in pending:
task.cancel()
# force a result check; if there was an exception it'll be re-raised
for task in done:
task.result()
# pipe support
async def stdio(loop=None):
if loop is None:
loop = asyncio.get_running_loop()
if sys.platform == 'win32':
# no support for asyncio stdio yet on Windows, see https://bugs.python.org/issue26832
# use an executor to read from stdio and write to stdout
class Win32StdinReader:
def __init__(self):
self.stdin = sys.stdin.buffer
async def readline():
# a single call to sys.stdin.readline() is thread-safe
return await loop.run_in_executor(None, self.stdin.readline)
class Win32StdoutWriter:
def __init__(self):
self.buffer = []
self.stdout = sys.stdout.buffer
def write(self, data):
self.buffer.append(data)
async def drain(self):
data, self.buffer = self.buffer, []
# a single call to sys.stdout.writelines() is thread-safe
return await loop.run_in_executor(None, sys.stdout.writelines, data)
return Win32StdinReader(), Win32StdoutWriter()
reader = asyncio.StreamReader()
await loop.connect_read_pipe(
lambda: asyncio.StreamReaderProtocol(reader),
sys.stdin
)
writer_transport, writer_protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin,
os.fdopen(sys.stdout.fileno(), 'wb')
)
writer = asyncio.streams.StreamWriter(writer_transport, writer_protocol, None, loop)
return reader, writer
async def pipe_consumer(pipereader, outgoing):
# take messages from the pipe and push them into the queue
while True:
message = await pipereader.readline()
if not message:
break
await outgoing.put(message.decode('utf8'))
async def pipe_producer(pipewriter, incoming):
# take messages from the queue and send them to the pipe
while True:
jsonmessage = await incoming.get()
message = json.loads(jsonmessage)
type = int(message.get('header', {}).get('messageID', -1))
# 1 is DENM message, 2 is CAM message
if type in {1, 2}:
pipewriter.write(jsonmessage.encode('utf8') + b'\n')
await pipewriter.drain()
async def connect_pipe(incoming, outgoing):
reader, writer = await stdio()
# create tasks for the consumer and producer. The asyncio loop will
# manage these independently
consumer_task = asyncio.create_task(pipe_consumer(reader, outgoing))
producer_task = asyncio.create_task(pipe_producer(writer, incoming))
# start both tasks, but have the loop return to us when one of them
# has ended. We can then cancel the remainder
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED
)
for task in pending:
task.cancel()
# force a result check; if there was an exception it'll be re-raised
for task in done:
task.result()
async def main():
pipe_to_socket = asyncio.Queue()
socket_to_pipe = asyncio.Queue()
socket_coro = connect_socket(pipe_to_socket, socket_to_pipe)
pipe_coro = connect_pipe(socket_to_pipe, pipe_to_socket)
await asyncio.gather(socket_coro, pipe_coro)
if __name__ == '__main__':
asyncio.run(main())
Затем начинается две задачи: одна для управления сокетом, другая для управления каналом STDIO. Каждый из них запускает еще 2 задания для своего потребителя и производителя. Есть две очереди для отправки сообщений от потребителя одного и производителю другого.