Как я могу синхронизировать asyncio с другими потоками ОС? - PullRequest
0 голосов
/ 05 ноября 2018

У меня есть программа с одним основным потоком, где я создаю второй поток, который использует asyncio. Есть ли инструменты для синхронизации этих двух потоков? Если бы все было асинхронно, я мог бы сделать это с его примитивами синхронизации, например:

import asyncio

async def taskA(lst, evt):
    print(f'Appending 1')
    lst.append(1)
    evt.set()

async def taskB(lst, evt):
    await evt.wait()
    print('Retrieved:', lst.pop())

lst = []
evt = asyncio.Event()
asyncio.get_event_loop().run_until_complete(asyncio.gather(
    taskA(lst, evt),
    taskB(lst, evt),
))

Однако это не работает с несколькими потоками. Если я просто использую threading.Event, тогда он заблокирует поток асинхронности. Я понял, что могу отложить ожидание до исполнителя:

import asyncio
import threading

def taskA(lst, evt):
    print(f'Appending 1')
    lst.append(1)
    evt.set()

async def taskB(lst, evt):
    asyncio.get_event_loop().run_in_executor(None, evt.wait)
    print('Retrieved:', lst.pop())

def targetA(lst, evt):
    taskA(lst, evt)

def targetB(lst, evt):
    asyncio.set_event_loop(asyncio.new_event_loop())
    asyncio.get_event_loop().run_until_complete(taskB(lst, evt))

lst = []
evt = threading.Event()
threadA = threading.Thread(target=targetA, args=(lst, evt))
threadB = threading.Thread(target=targetB, args=(lst, evt))
threadA.start()
threadB.start()
threadA.join()
threadB.join()

Однако иметь поток исполнителя только для ожидания мьютекса кажется неестественным. Это так, как это должно быть сделано? Или есть другой способ асинхронно ожидать синхронизации между потоками ОС?

1 Ответ

0 голосов
/ 06 ноября 2018

Простой способ синхронизировать асинхронную сопрограмму с событием, приходящим из другого потока, - это ожидать asyncio.Event в taskB и устанавливать его из taskA, используя loop.call_soon_threadsafe.

Чтобы иметь возможность передавать значения и исключения между ними, вы можете использовать фьючерсы; однако тогда вы изобретаете большую часть run_in_executor. Если единственной задачей задачи A является удаление задач из очереди, вы также можете создать пул с одним рабочим и использовать его в качестве рабочего потока. Тогда вы можете использовать run_in_executor как предполагалось:

worker = concurrent.futures.ThreadPoolExecutor(max_workers=1)

async def taskB(lst):
    loop = asyncio.get_event_loop()
    # or result = await ..., if taskA has a useful return value
    # This will also propagate exceptions raised by taskA
    await loop.run_in_executor(worker, taskA, lst)
    print('Retrieved:', lst.pop())

Семантика такая же, как в вашей версии с явной очередью - очередь все еще там, она просто внутри ThreadPoolExecutor.

...