Как общаться между традиционным потоком и потоком asyncio в Python? - PullRequest
1 голос
/ 24 апреля 2019

В python, каков идиоматический способ установить одностороннюю связь между двумя threading.Thread s, назовите их thread a и thread b.

a - производитель, он непрерывно генерирует значения для b для потребления.

b - это потребитель, он читает одно значение, сгенерированное a, обрабатывает значение с сопрограммой, а затем читает следующее значение и т. Д.

Иллюстрация:

q = very_magic_queue.Queue()


def worker_of_a(q):
    while True:
        q.put(1)
        time.sleep(1)

a = threading.Thread(worker_of_a, args=(q,))
a.start()


async def loop(q):
    while True:
        # v must be processed in the same order as they are produced
        v = await q.get()
        print(v)

async def foo():
    pass

async def b_main(q):
    loop_fut = asyncio.ensure_future(loop(q))
    foo_fut = asyncio.ensure_future(foo())
    _ = await asyncio.wait([loop_fut, foo_fut], ...)
    # blah blah blah

def worker_of_b(q):
    asyncio.set_event_loop(asyncio.new_event_loop())
    asyncio.get_event_loop().run_until_complete(b_main(q))

b = threading.Thread(worker_of_b, args=(q,))
b.start()

Конечно, приведенный выше код не работает, потому что queue.Queue.get не может быть await ted и asyncio.Queue не может использоваться в другом потоке.

Мне также нужен канал связи от b до a.

Было бы здорово, если бы решение могло работать и с gevent.

Спасибо:)

1 Ответ

0 голосов
/ 24 апреля 2019

Вы можете использовать синхронизированную очередь из модуля queue и отложить ожидание до ThreadPoolExecutor:

async def loop(q):
    from concurrent.futures import ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=1) as executor:
        loop = asyncio.get_event_loop()
        while True:
            # v must be processed in the same order as they are produced
            v = await loop.run_in_executor(executor, q.get)
            print(v)
...