Как я могу контролировать, сколько экземпляров задач работает в любой момент времени с Asyncio? - PullRequest
0 голосов
/ 13 июня 2019

Я сделал две системы очередей с Asyncio.

Responder берет список ссылок, запрашивает ответ для каждой и помещает результат в очередь. Parser берет ответ из очереди, анализирует его и добавляет в другую очередь. Submitter получает проанализированный объект из очереди и отправляет его в базу данных.

Код ниже показывает, как я создаю задачи. Для Submitter и Parser я создаю 100 экземпляров. Кажется, проблема в том, что как только Submitter достигает 100 экземпляров, вот и все - очередь отправки только начинает резервное копирование. Это в основном перестает делать свою работу. Ничто больше не отправляется. Responder и Parser обычно продолжают.

Как только submitter выполнил свою работу, как мне его переработать? Я не хочу обязательно создавать responder для каждой ссылки в моем списке. Это часть моего кода, которую я не до конца понимаю - for s in submitters: s.cancel() Это убивает мои экземпляры после , когда все сделано, или после , когда экземпляр сделал свою работу?

async def bulk_submit(not_submitted: set, **kwargs):
    parse_queue = asyncio.Queue()
    submit_queue = asyncio.Queue()
    headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36"}

    timeout = aiohttp.ClientTimeout(total=60*60)
    async with ClientSession(headers=headers, timeout=timeout) as session:
        tasks=[]
        i=0
        for link in not_submitted:
            i+=1
            tasks.append(
                responder(f'RESPONDER-{i}',url=link, session=session, parse_queue=parse_queue, **kwargs)
            )

        parsers = [asyncio.create_task(parser(f'PARSER-{n}', parse_queue=parse_queue, submit_queue=submit_queue)) for n in range (100)]
        submitters = [asyncio.create_task(submitter(f'SUBMITTER-{n}', submit_queue=submit_queue,)) for n in range (100)]


        await asyncio.gather(*tasks)
        await parse_queue.join()
        await episode_queue.join()
        await submit_queue.join()
        for s in submitters:
            s.cancel()
        for p in parsers:
            p.cancel()

1 Ответ

0 голосов
/ 15 июня 2019

Кажется, проблема в том, что как только Submitter достигает 100 экземпляров, вот и все - очередь отправки только начинает резервное копирование.[...]

Как только submitter выполнил свою работу, как его переработать?

Вы не показали нам код submitter, поэтому трудно сказать, как решить проблему, или даже какую проблему вы решаете точно.Основываясь на показанном вами коде, можно догадаться, что отправитель просто возвращается после обработки одного элемента очереди.Вы фактически создаете 100 отправителей, работающих параллельно, но доступ к очереди их сериализует.Когда каждый из них выполняет свою работу, некому больше дренировать очередь отправки, и работа останавливается.

Чтобы это исправить, вам не нужно перезапускать отправителя, вам просто нужно изменить его напродолжайте снимать с очереди элементы очереди вместо того, чтобы выходить после получения.Это должно выглядеть следующим образом:

async def submitter(name, submit_queue):
    while True:
        item = await submit_queue.get()
        ... process the item ...

При такой настройке вам не нужно создавать 100 отправителей для параллельной работы, достаточно одного.(Если вы на самом деле не хотите некоторую степень параллелизма, то есть, в этом случае вы можете создать столько их, сколько вы хотите, чтобы они происходили параллельно.)

Это частьв конце моего кода, который я не совсем понимаю - for s in submitters: s.cancel() Это убивает мои экземпляры после того, как все сделано, или после того, как экземпляр сделал свою работу?

Я подозреваю, что в вашем кодеотмена невозможна, потому что все ваши submitter сопрограммы имеют выполнено к тому времени, когда вы звоните cancel() (отмена выполненного задания игнорируется).

Обычно идея состоит в том, чтобыубивайте бездействующих работников после того, как их работа закончена, и они больше не нужны.Например, если submitter содержит бесконечный цикл, как показано выше, отмена предотвратит бесконечное ожидание нового элемента очереди (и никогда его не получение) после возврата bulk_submit.

...