Как структурировать начало и окончание синхронных звонков с помощью трио? - PullRequest
2 голосов
/ 20 июня 2019

Я прошу о структурированном псевдокоде трио (фактические трио-вызовы функций, но пустое заполнение «рабочий-делает-здесь»), чтобы я мог понять и опробовать хорошие методы управления потоком для переключения между синхронным и асинхронным процессы.

Я хочу сделать следующее ...

  • загрузить файл json-data в data-dict
    • в сторону: data-dict выглядит как {'key_a': {(info_dict_a)}, 'key_b': {info_dict_b}}
  • у каждого из n-работников ...
    • доступ к этому data-dict, чтобы найти следующую запись-to-process info-dict
    • подготовить некоторые данные из обрабатываемой записи и опубликовать данные в URL
    • обработать пост-ответ для обновления ключа 'response' в info-dict, обрабатываемом записью
    • обновить data-dict ключом info-dict
    • перезаписать исходный файл json-данных обновленным data-dict

В сторону: я знаю, что есть и другие способы достижения моей общей цели, кроме неуклюжего повторного перезаписи файла json - но я не спрашиваю об этом; Я действительно хотел бы понять трио достаточно хорошо, чтобы иметь возможность использовать его для этого потока.

Итак, процессы, которые я хочу синхронизировать:

  • информация о следующей записи для обработки
  • обновление data-dict
  • перезапись исходного файла json-данных обновленным data-dict

Новичок в трио, у меня есть рабочий код здесь ... который, я считаю, - это синхронное получение следующей записи для обработки (с использованием метода trio.Semaphore () ). Но я почти уверен, что не сохраняю файл синхронно.

Learning Go Несколько лет назад я почувствовал, что ухватился за подходы к переплетению синхронных и асинхронных вызовов - но пока не с трио. Заранее спасибо.

Ответы [ 2 ]

1 голос
/ 21 июня 2019

Этот код использует каналы для мультиплексирования запросов к пулу работников и обратно. Я обнаружил дополнительное требование (в ваших комментариях к коду) о том, что скорость пост-ответа регулируется, поэтому read_entries спит после каждого send.

from random import random    
import time, asks, trio    

snd_input, rcv_input = trio.open_memory_channel(0)
snd_output, rcv_output = trio.open_memory_channel(0)    

async def read_entries():
    async with snd_input:
        for key_entry in range(10):
            print("reading", key_entry)    
            await snd_input.send(key_entry)    
            await trio.sleep(1)    

async def work(n):
    async for key_entry in rcv_input:    
        print(f"w{n} {time.monotonic()} posting", key_entry)    
        r = await asks.post(f"https://httpbin.org/delay/{5 * random()}")
        await snd_output.send((r.status_code, key_entry))

async def save_entries():    
    async for entry in rcv_output:    
        print("saving", entry)    

async def main():    
    async with trio.open_nursery() as nursery:
        nursery.start_soon(read_entries)    
        nursery.start_soon(save_entries)    
        async with snd_output:
            async with trio.open_nursery() as workers:
                for n in range(3):
                    workers.start_soon(work, n)

trio.run(main)
1 голос
/ 21 июня 2019

Вот как я бы написал (псевдо-) код:

    async def process_file(input_file):
        # load the file synchronously
        with open(input_file) as fd:
            data = json.load(fd)

        # iterate over your dict asynchronously
        async with trio.open_nursery() as nursery:
            for key, sub in data.items():
                if sub['updated'] is None:
                    sub['updated'] = 'in_progress'
                    nursery.start_soon(post_update, {key: sub})

        # save your result json synchronously
        save_file(data, input_file)

trio гарантирует вам, что после выхода из блока async with каждое задание , которое вы породили,завершите, так что вы можете безопасно сохранить ваш файл, потому что больше не будет обновлений.

Я также удалил функцию grab_next_entry, потому что мне кажется, что эта функция будет перебирать те же ключи (постепенно) при каждом вызове (придавая сложности O (n!)), в то время как вы могли бы упростить ее, просто перебирая один раз ваш диктант (понижая сложность до O (n))

Вам также не нужен Semaphore, кроме случаев, еслиВы хотите ограничить количество параллельных вызовов post_update.Но trio также предлагает встроенный механизм для этого благодаря CapacityLimiter , который вы будете использовать следующим образом:

    limit = trio.CapacityLimiter(10)
    async with trio.open_nursery() as nursery:
        async with limit:
            for x in z:
                nursery.start_soon(func, x)

ОБНОВЛЕНИЕ благодаря @ njsmith *Комментарий 1023 *

Итак, чтобы ограничить количество одновременных post_update, вы перепишите его так:

    async def post_update(data, limit):
        async with limit:
            ...

И затем вы можете переписатьпредыдущий цикл выглядит так:

    limit = trio.CapacityLimiter(10)
    # iterate over your dict asynchronously
    async with trio.open_nursery() as nursery:
        for key, sub in data.items():
            if sub['updated'] is None:
                sub['updated'] = 'in_progress'
                nursery.start_soon(post_update, {key: sub}, limit)

Таким образом, мы создаем n задач для записей n в вашем data-dict, но если их больше 10задачи выполняются одновременно, тогда дополнительные должны будут ждать освобождения лимита (в конце блока async with limit).

...