Потребитель производителя asyncio звонит из не асинхронного основного потока - PullRequest
0 голосов
/ 10 ноября 2018

Я пытаюсь запустить asyncio в потоке. Причина, по которой я это делаю, заключается в том, что я хочу общаться с asyncio из основного потока, не являющегося асинхронным. Так вот мой код

import asyncio
import threading

async def consume(input_q, output_q):
    while True:
        item = await input_q.get()
        print(item)
        output_q.put("hello back")

async def run(input_q, output_q):
        asyncio.ensure_future(consume(input_q, output_q))
        while True:
            await asyncio.sleep(1)
            print("message")

def run_in_thread(loop,input_q,output_q):
    asyncio.set_event_loop(loop)
    loop.run_until_complete(run(input_q, output_q))
loop = asyncio.new_event_loop()
input_q = asyncio.Queue(loop=loop)
output_q = asyncio.Queue(loop=loop)
thread = threading.Thread(target=run_in_thread ,args=(loop, input_q, output_q))
thread.start()
time.sleep(5)
asyncio.run_coroutine_threadsafe(input_q.put("hello input"), loop)
time.sleep(2)
item = asyncio.run_coroutine_threadsafe(output_q.get_nowait(), loop).result()
assert "hello back" in item
print(item)
loop.call_soon_threadsafe(loop.stop)
thread.join()

Я могу помещать сообщения в очередь ввода из основного потока. Булочка, к сожалению, не умеет читать из output_q. Есть ли решение для этого? Получение ошибки в этой строке:

item = asyncio.run_coroutine_threadsafe(output_q.get_nowait(), loop).result() 
--> raises QueueEmpty

Может быть, используя многопроцессорность или что-то еще То, что я хотел бы сделать, это запустить вышеуказанные функции и общаться с ними из другого процесса или потока, который не использует асинхронный. Большое спасибо за любую подсказку!

PS: я использую python 3.5

...