Кажется, проблема в том, что как только Submitter
достигает 100 экземпляров, вот и все - очередь отправки только начинает резервное копирование.[...]
Как только submitter
выполнил свою работу, как его переработать?
Вы не показали нам код submitter
, поэтому трудно сказать, как решить проблему, или даже какую проблему вы решаете точно.Основываясь на показанном вами коде, можно догадаться, что отправитель просто возвращается после обработки одного элемента очереди.Вы фактически создаете 100 отправителей, работающих параллельно, но доступ к очереди их сериализует.Когда каждый из них выполняет свою работу, некому больше дренировать очередь отправки, и работа останавливается.
Чтобы это исправить, вам не нужно перезапускать отправителя, вам просто нужно изменить его напродолжайте снимать с очереди элементы очереди вместо того, чтобы выходить после получения.Это должно выглядеть следующим образом:
async def submitter(name, submit_queue):
while True:
item = await submit_queue.get()
... process the item ...
При такой настройке вам не нужно создавать 100 отправителей для параллельной работы, достаточно одного.(Если вы на самом деле не хотите некоторую степень параллелизма, то есть, в этом случае вы можете создать столько их, сколько вы хотите, чтобы они происходили параллельно.)
Это частьв конце моего кода, который я не совсем понимаю - for s in submitters: s.cancel()
Это убивает мои экземпляры после того, как все сделано, или после того, как экземпляр сделал свою работу?
Я подозреваю, что в вашем кодеотмена невозможна, потому что все ваши submitter
сопрограммы имеют выполнено к тому времени, когда вы звоните cancel()
(отмена выполненного задания игнорируется).
Обычно идея состоит в том, чтобыубивайте бездействующих работников после того, как их работа закончена, и они больше не нужны.Например, если submitter
содержит бесконечный цикл, как показано выше, отмена предотвратит бесконечное ожидание нового элемента очереди (и никогда его не получение) после возврата bulk_submit
.