Python Trio настроил десятичное число рабочих - PullRequest
0 голосов
/ 21 июня 2019

Я работаю с trio, чтобы запустить асинхронную параллельную задачу, которая будет выполнять некоторую проверку в сети на разных сайтах. Я хотел бы иметь возможность выбирать, с каким количеством одновременно работающих работников я буду делить задачи. Для этого я написал этот код

async def run_task():
    s = trio.Session(connections=5)
    Total_to_check = to_check() / int(module().workers)
    line = 0
    if int(Total_to_check) < 1:
        Total_to_check = 1
        module().workers = int(to_check())
    for i in range(int(Total_to_check)):
        try:
            async with trio.open_nursery() as nursery:
                for x in range(int(module().workers)):                  
                        nursery.start_soon(python_worker, self, s, x, line)
                        line += 1


        except BlockingIOError as e:
            print("[Fatal Error]", str(e))
            continue            

В этом примере to_check() равно тому, сколько URL-адресов дано для извлечения данных, а module().workers равно количеству работающих одновременно рабочих.

Итак, если бы у меня было, скажем, 30 URL-адресов, и я ввел, что я хочу 10 одновременных задач, он будет извлекать данные из 10 URL-адресов одновременно и повторить процедуру 3 раза.

Теперь все это хорошо, пока Total_to_check (что равно числу URL-адресов, деленному на количество рабочих) не будет в десятичных дробях. Если у меня есть, скажем, 15 URL, и я прошу 10 рабочих, то этот код будет проверять только 10 URL. То же самое, если у меня есть 20 URL, но попросить 15 рабочих. Я мог бы сделать что-то вроде math.ceil (Total_to_check), но тогда он начнет пытаться проверять URL-адреса, которые не существуют.

Как я мог заставить это работать должным образом, чтобы, если у меня было 10 одновременных задач и 15 URL, он проверил первые 10 одновременно, а затем последние 5 одновременно, не пропуская URL? (или пытается проверить слишком много)

Спасибо!

1 Ответ

2 голосов
/ 21 июня 2019

Ну, вот и CapacityLimiter , который вы бы использовали следующим образом:

async def python_worker(self, session, workers, line, limit):
    async with limit:
        ...

Тогда вы можете упростить вашу run_task:

async def run_task():
    limit = trio.CapacityLimiter(10)
    s = trio.Session(connections=5)
    line = 0
    async with trio.open_nursery() as nursery:
        for x in range(int(to_check())):
            nursery.start_soon(python_worker, self, s, x, line, limit)
            line += 1      

Я считаю, что BlockingIOError должен был бы двигаться и внутри python_worker, потому что nursery.start_soon() не будет блокировать, это __aexit__ из nursery, которое автоматически ожидает в конце async with trio.open_nursery() as nursery блок.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...