Asyncio в Python: как я могу дождаться завершения некоторых файлов при запуске сопрограмм - PullRequest
0 голосов
/ 05 ноября 2019

Я создаю программу, которая rsync передает данные с нескольких удаленных станций (~ 200). Иногда существует несколько файлов на станцию ​​и группы станций, с которыми НЕ следует связываться одновременно, иначе соединение будет закрыто. В настоящее время у меня есть асинхронная подпрограмма (использующая asyncio в python), которая асинхронно rsync передает все файлы одновременно (на соответствующие станции). Это приводит к закрытию соединения, если мы получаем отставание файлов или связываемся со станциями, которые находятся в той же группе в то же время.

Что мне нужно сделать, это создать сгруппированные задачи, где в группе станций она ожидает обновления предыдущего файла (updateFile ()) перед запуском следующего файла, но асинхронно запускает все группы станций вв то же время.

Новичок в асинхронном программировании, и я просто не могу понять, как заставить эту проблему работать.

В настоящее время мне удалось запустить все асинхронно.

Стартовый цикл

loop = asyncio.get_event_loop()

Создание сгруппированных задач станций и отдельных задач станций

tasks=[]
for group, files in file_groups.items():
   files_in_task = []
   for order, fqdn, file in files:
       if group == 'none':
           futures = [updateFile(file, fqdn)]
           tasks.append(asyncio.gather(*futures))
       else: # In Group
           x = (file, fqdn)
           files_in_task.append(x)
       futures = [updateFile(file,fqdn) for (file,fqdn) in files_in_task]
       tasks.append(asyncio.wait(*futures))

Запуск цикла событий, пока не будут возвращены все задачи.

loop.run_until_complete(asyncio.wait(tasks))
loop.close()

1 Ответ

1 голос
/ 05 ноября 2019

Если мое понимание верно, вы хотите реализовать группы, которые работают параллельно, с каждой отдельной группой, выполняющей свои элементы в последовательности.

Сопрограмма, которая обновляет одну группу, будет состоять из простого цикла с использованием await для последовательной оценки отдельных элементов:

async def update_group(files):
    # update a single group, by running updateFiles in series
    for _order, fqdn, filename in files:
        await updateFile(filename, fqdn)

Для обновления всех групп необходимо запустить несколько экземпляров update_group() параллельно . В asyncio единицей паралеллизма является задача , поэтому мы создаем одну для каждого update_group() и используем asyncio.gather, чтобы дождаться завершения всех задач, позволяя им запускатьсяпараллельно:

async def update_all_groups(file_groups):
    # update all the different groups in parallel
    tasks = []
    for _group, files in file_groups.items():
        tasks.append(asyncio.create_task(update_group(files)))
    await asyncio.gather(*tasks)

Наконец, вся установка вызывается из кода синхронизации, в идеале с помощью одного вызова asyncio.run, который устанавливает, запускает и закрывает цикл:

asyncio.run(update_all_groups(file_groups))
...