при запуске asyncio с использованием executors возникает ошибка моего типа возврата, который не вызывается - PullRequest
0 голосов
/ 20 марта 2020

в приведенном ниже коде я пытаюсь найти все запросы ко всем файлам, которые содержат расширение «.py». Сначала я выполняю поиск по списку каталогов в моем пути в ширину и рекурсивно вызываю ту же функцию в полученном списке. подкаталогов. я получаю сообщение об ошибке:

Future exception was never retrieved
future: <Future finished exception=TypeError("'generator' object is not callable")>
Traceback (most recent call last):

код указан ниже

from pathlib import Path
from os.path import sep as pathsep
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(4)

async def main(basedir, query):
    loop.run_in_executor(None, find_files(basedir, query) )

@asyncio.coroutine
def find_files(path, query_string):
    subdirs = []
    for p in path.iterdir():
            fullpath = str(p.absolute)
            if p.is_dir and not p.is_symlink():
                subdirs.append(p)
            if query_string in fullpath:
                print(fullpath)
    loop =  asyncio.get_event_loop()
    tasks = [loop.run_in_executor(executor, find_files, subdir, query_string) for subdir in subdirs] # this doesnt work
    yield from asyncio.gather(*tasks)
    return subdirs


query = '.py'
basedir = Path(pathsep).absolute() 

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.set_default_executor(executor)
    try:
        loop.run_until_complete(main(basedir, query))
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        executor.shutdown(wait = True)
        loop.close()

'' '

РЕДАКТИРОВАТЬ: я понимаю ошибки от user4815162342 ответ ниже, это работает сейчас

import asyncio
from pathlib import Path
from os.path import sep as pathsep
from collections import deque
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

query = '.py'
basedir = Path(pathsep).absolute() 
futures = deque()

def find_files(path, query_string):
    subdirs = []
    try:
        for p in path.iterdir():
                fullpath = str(p.absolute)
                if p.is_dir and not p.is_symlink():
                    subdirs.append(p)
                if query_string in fullpath:
                    print(fullpath)
    return subdirs

async def main(executor):
    loop = asyncio.get_event_loop()
    task = [loop.run_in_executor(executor, find_files, basedir, query) ]
    completed, _ = await asyncio.wait(task)
    result = [t.result() for t in completed]
    futures.append(result[0])
    while futures:
        future = futures.popleft()
        for subdir in future:
            task = [loop.run_in_executor(executor, find_files, subdir, query) ]
            completed, _ = await asyncio.wait(task)
            result = [t.result() for t in completed][0]
            futures.append(task[0].result() )


if __name__ == "__main__":
    tstart = time.time()
    executor = ThreadPoolExecutor(
        max_workers=4,
    )

    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(
            main(executor)
        )
    finally:
        event_loop.close()    
print('time elapsed is', time.time() - tstart)

1 Ответ

0 голосов
/ 20 марта 2020

Есть несколько проблем с вашим кодом.

  • Во-первых, run_in_executor ожидает вызываемый объект, который он будет вызывать в другом потоке, приостанавливать выполнение текущей сопрограммы, и возобновите его, как только вызываемый вызов произведет результат (или вызовет исключение). Другими словами, run_in_executor ожидает функцию , и вы вместо этого даете ей объект сопрограммы, поэтому вы получаете исключение, что этот объект не вызывается.

  • Во-вторых, вы должны дождаться результата run_in_executor. Отсутствие await является причиной того, что вас предупреждают, что исключение никогда не будет получено.

  • Наконец, вызываемая вами функция уже является сопрограммой, поэтому вам не нужно run_in_executor на первом месте. run_in_executor только для вызова привязанного к процессору или устаревшего кода блокировки. Просто ждите его напрямую: await find_files(...).

Обратите также внимание, что вы должны использовать async def и await вместо asyncio.coroutine decorator и yield from. Последний устарел и скоро будет удален.

...