в приведенном ниже коде я пытаюсь найти все запросы ко всем файлам, которые содержат расширение «.py». Сначала я выполняю поиск по списку каталогов в моем пути в ширину и рекурсивно вызываю ту же функцию в полученном списке. подкаталогов. я получаю сообщение об ошибке:
Future exception was never retrieved
future: <Future finished exception=TypeError("'generator' object is not callable")>
Traceback (most recent call last):
код указан ниже
from pathlib import Path
from os.path import sep as pathsep
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(4)
async def main(basedir, query):
loop.run_in_executor(None, find_files(basedir, query) )
@asyncio.coroutine
def find_files(path, query_string):
subdirs = []
for p in path.iterdir():
fullpath = str(p.absolute)
if p.is_dir and not p.is_symlink():
subdirs.append(p)
if query_string in fullpath:
print(fullpath)
loop = asyncio.get_event_loop()
tasks = [loop.run_in_executor(executor, find_files, subdir, query_string) for subdir in subdirs] # this doesnt work
yield from asyncio.gather(*tasks)
return subdirs
query = '.py'
basedir = Path(pathsep).absolute()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.set_default_executor(executor)
try:
loop.run_until_complete(main(basedir, query))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
executor.shutdown(wait = True)
loop.close()
'' '
РЕДАКТИРОВАТЬ: я понимаю ошибки от user4815162342 ответ ниже, это работает сейчас
import asyncio
from pathlib import Path
from os.path import sep as pathsep
from collections import deque
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
query = '.py'
basedir = Path(pathsep).absolute()
futures = deque()
def find_files(path, query_string):
subdirs = []
try:
for p in path.iterdir():
fullpath = str(p.absolute)
if p.is_dir and not p.is_symlink():
subdirs.append(p)
if query_string in fullpath:
print(fullpath)
return subdirs
async def main(executor):
loop = asyncio.get_event_loop()
task = [loop.run_in_executor(executor, find_files, basedir, query) ]
completed, _ = await asyncio.wait(task)
result = [t.result() for t in completed]
futures.append(result[0])
while futures:
future = futures.popleft()
for subdir in future:
task = [loop.run_in_executor(executor, find_files, subdir, query) ]
completed, _ = await asyncio.wait(task)
result = [t.result() for t in completed][0]
futures.append(task[0].result() )
if __name__ == "__main__":
tstart = time.time()
executor = ThreadPoolExecutor(
max_workers=4,
)
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
main(executor)
)
finally:
event_loop.close()
print('time elapsed is', time.time() - tstart)