Вот как я бы написал (псевдо-) код:
async def process_file(input_file):
# load the file synchronously
with open(input_file) as fd:
data = json.load(fd)
# iterate over your dict asynchronously
async with trio.open_nursery() as nursery:
for key, sub in data.items():
if sub['updated'] is None:
sub['updated'] = 'in_progress'
nursery.start_soon(post_update, {key: sub})
# save your result json synchronously
save_file(data, input_file)
trio
гарантирует вам, что после выхода из блока async with
каждое задание , которое вы породили,завершите, так что вы можете безопасно сохранить ваш файл, потому что больше не будет обновлений.
Я также удалил функцию grab_next_entry
, потому что мне кажется, что эта функция будет перебирать те же ключи (постепенно) при каждом вызове (придавая сложности O (n!)), в то время как вы могли бы упростить ее, просто перебирая один раз ваш диктант (понижая сложность до O (n))
Вам также не нужен Semaphore
, кроме случаев, еслиВы хотите ограничить количество параллельных вызовов post_update
.Но trio
также предлагает встроенный механизм для этого благодаря CapacityLimiter , который вы будете использовать следующим образом:
limit = trio.CapacityLimiter(10)
async with trio.open_nursery() as nursery:
async with limit:
for x in z:
nursery.start_soon(func, x)
ОБНОВЛЕНИЕ благодаря @ njsmith *Комментарий 1023 *
Итак, чтобы ограничить количество одновременных post_update
, вы перепишите его так:
async def post_update(data, limit):
async with limit:
...
И затем вы можете переписатьпредыдущий цикл выглядит так:
limit = trio.CapacityLimiter(10)
# iterate over your dict asynchronously
async with trio.open_nursery() as nursery:
for key, sub in data.items():
if sub['updated'] is None:
sub['updated'] = 'in_progress'
nursery.start_soon(post_update, {key: sub}, limit)
Таким образом, мы создаем n задач для записей n в вашем data-dict, но если их больше 10задачи выполняются одновременно, тогда дополнительные должны будут ждать освобождения лимита (в конце блока async with limit
).