Как использовать asyncio с ProcessPoolExecutor - PullRequest
0 голосов
/ 23 мая 2019

Я ищу огромное количество адресов в сети, я хочу использовать как asyncio, так и ProcessPoolExecutor в своей задаче для быстрого поиска адресов.

    async def main():
        n_jobs = 3
        addresses = [list of addresses]
        _addresses = list_splitter(data=addresses, n=n_jobs)
        with ProcessPoolExecutor(max_workers=n_jobs) as executor:
             futures_list = []
             for _address in _addresses:
                futures_list +=[asyncio.get_event_loop().run_in_executor(executor, execute_parallel, _address)]

                for f in tqdm(as_completed(futures_list, loop=asyncio.get_event_loop()), total=len(_addresses)):
                results = await f

asyncio.get_event_loop().run_until_complete(main())

Ожидаемый результат: Я хочу, чтобы execute_parallel функция должна работать параллельно.

ошибка:

    Traceback (most recent call last):
  File "/home/awaish/danamica/scraping/skraafoto/aerial_photos_scraper.py", line 228, in <module>
    asyncio.run(main())
  File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "/home/awaish/danamica/scraping/skraafoto/aerial_photos_scraper.py", line 224, in main
    results = await f
  File "/usr/local/lib/python3.7/asyncio/tasks.py", line 533, in _wait_for_one
    return f.result()  # May raise f.exception().
TypeError: can't pickle coroutine objects

1 Ответ

1 голос
/ 29 мая 2019

Я не уверен, что отвечаю на правильный вопрос, но, похоже, цель вашего кода - запустить функцию execute_parallel в нескольких процессах с использованием Asyncio.В отличие от использования ProcessPoolExecutor, почему бы не попробовать что-то вроде использования обычного многопроцессорного пула и настройки отдельных циклов Asyncio для запуска в каждом из них.Вы можете настроить один процесс на ядро ​​и позволить Asyncio работать со своим волшебством в каждом процессе.

async def run_loop(addresses):
    loop = asyncio.get_event_loop()
    loops = [loop.create_task(execute_parallel, address) for address in addresses]
    loop.run_until_complete(asyncio.wait(loops))

def main():
    n_jobs = 3
    addresses = [list of addresses]
    _addresses = list_splitter(data=addresses, n=n_jobs)
    with multiprocessing.Pool(processes=n_jobs) as pool:
        pool.imap_unordered(run_loop, _addresses)

Я использовал Pool.imap_unordered с большим успехом, но в зависимости от ваших потребностей вы можете предпочесть Pool.map иликакой-то другой функционал.Вы можете поэкспериментировать с размером фрагмента или с количеством адресов в каждом списке, чтобы достичь оптимальных результатов (т. Е. Если вы получаете много тайм-аутов, возможно, вы захотите уменьшить количество адресов, обрабатываемых одновременно)

...