Как хорошо объяснено в , этот ответ , Queue.join
служит для информирования производителя, когда вся работа, введенная в очередь, завершена. Поскольку ваша первая очередь не не знает , когда выполняется конкретный элемент (он умножается и распределяется по другим очередям), join
не подходит вам.
Судя по вашему код, кажется, что ваши работники должны работать только столько времени, сколько требуется для обработки начальных элементов очереди. Если это так, то вы можете использовать страж останова, чтобы дать сигнал рабочим выйти. Например:
async with aiohttp.ClientSession() as session:
# ... create tasks as above ...
for i in IDs:
queue1.put_nowait(i)
queue1.put_nowait(None) # no more work
await asyncio.gather(*tasks)
Это похоже на ваш исходный код, но с явным запросом на отключение. Рабочие должны обнаружить дозорного и реагировать соответствующим образом: распространить его на следующую очередь / рабочего и выйти. Например, в worker1
:
while True:
item = queue1.get()
if item is None:
# done with processing, propagate sentinel to worker2 and exit
await queue2.put(None)
break
# ... process item as usual ...
То же самое с двумя другими работниками (за исключением worker3
, который не будет распространяться из-за отсутствия следующей очереди), приведет к выполнению всех трех задач после завершения работа сделана. Поскольку очереди FIFO, рабочие могут безопасно выйти после встречи со стражем, зная, что ни один предмет не был сброшен. Явное завершение работы также отличает очередь выключения от той, которая оказывается пустой, что препятствует преждевременному выходу рабочих из-за временно пустой очереди.
Этот метод на самом деле продемонстрирован в документация Queue
, но этот пример несколько сбивает с толку и использование Queue.join
и использование сторожевого устройства завершения работы. Эти два отдельных и могут использоваться независимо друг от друга. (И может также иметь смысл использовать их вместе, например, использовать Queue.join
, чтобы дождаться «вехи», а затем поместить другие вещи в очередь, оставляя за собой стража для остановки рабочих.)