Эффективное использование нескольких очередей Asyncio - PullRequest
2 голосов
/ 17 февраля 2020

В настоящее время я строю проект, который требует нескольких запросов к различным конечным точкам. Я обертываю эти запросы в Aiohttp, чтобы учесть asyn c.

Проблема:
У меня есть три очереди: queue1, queue2 и queue3. Кроме того, у меня есть три рабочие функции (worker1, worker2, worker3), которые связаны с их соответствующей очередью. Первая очередь заполняется немедленно идентификаторами списка, которые известны до запуска. Когда запрос завершен и данные переданы в базу данных, он передает идентификатор в queue2. A worker2 примет этот идентификатор и запросит дополнительные данные. На основе этих данных он начнет генерировать список идентификаторов (в отличие от идентификаторов в queue1/queue2. worker2 поместит идентификаторы в queue3. Наконец worker3 извлечет этот идентификатор из queue3 и запросит дополнительные данные перед фиксацией в базу данных.

Проблема возникает из-за того, что queue.join() является блокирующим вызовом. Каждый работник привязан к отдельной очереди, поэтому соединение для queue1 будет блокироваться до его завершения. Это нормально , но это также отрицает цель использования asyn c. Без использования join() программа не может определить, когда очереди полностью пусты. Другая проблема заключается в том, что могут быть тихие ошибки, когда одна из очередей пуста, но есть еще данные, которые еще не были добавлены.

Схема базового c кода выглядит следующим образом:

queue1 = asyncio.Queue()
queue2 = asyncio.Queue()
queue3 = asyncio.Queue()

async with aiohttp.ClientSession() as session:
    for i in range(3):
        tasks.append(asyncio.create_task(worker1(queue1)))

    for i in range(3):
        tasks.append(asyncio.create_task(worker2(queue2)))

    for i in range(10):
        tasks.append(asyncio.create_task(worker3(queue3)))

    for i in IDs:
       queue1.put_nowait(i)

    await asyncio.gather(*tasks)

Рабочие функции находятся в бесконечном l oop ожидании элементы для входа в очередь.

Когда все данные будут обработаны, выхода не будет, и программа зависнет.

Существует ли способ эффективного управления рабочими и правильно закончится?

1 Ответ

2 голосов
/ 17 февраля 2020

Как хорошо объяснено в , этот ответ , Queue.join служит для информирования производителя, когда вся работа, введенная в очередь, завершена. Поскольку ваша первая очередь не не знает , когда выполняется конкретный элемент (он умножается и распределяется по другим очередям), join не подходит вам.

Судя по вашему код, кажется, что ваши работники должны работать только столько времени, сколько требуется для обработки начальных элементов очереди. Если это так, то вы можете использовать страж останова, чтобы дать сигнал рабочим выйти. Например:

async with aiohttp.ClientSession() as session:

    # ... create tasks as above ...

    for i in IDs:
       queue1.put_nowait(i)
    queue1.put_nowait(None)  # no more work

    await asyncio.gather(*tasks)

Это похоже на ваш исходный код, но с явным запросом на отключение. Рабочие должны обнаружить дозорного и реагировать соответствующим образом: распространить его на следующую очередь / рабочего и выйти. Например, в worker1:

while True:
    item = queue1.get()
    if item is None:
        # done with processing, propagate sentinel to worker2 and exit
        await queue2.put(None)
        break
    # ... process item as usual ...

То же самое с двумя другими работниками (за исключением worker3, который не будет распространяться из-за отсутствия следующей очереди), приведет к выполнению всех трех задач после завершения работа сделана. Поскольку очереди FIFO, рабочие могут безопасно выйти после встречи со стражем, зная, что ни один предмет не был сброшен. Явное завершение работы также отличает очередь выключения от той, которая оказывается пустой, что препятствует преждевременному выходу рабочих из-за временно пустой очереди.

Этот метод на самом деле продемонстрирован в документация Queue, но этот пример несколько сбивает с толку и использование Queue.join и использование сторожевого устройства завершения работы. Эти два отдельных и могут использоваться независимо друг от друга. (И может также иметь смысл использовать их вместе, например, использовать Queue.join, чтобы дождаться «вехи», а затем поместить другие вещи в очередь, оставляя за собой стража для остановки рабочих.)

...