Я бы также подумал об использовании чего-то другого, кроме использования базы данных в качестве «брокера».Это действительно не подходит для такой работы.
Хотя некоторые из этих издержек можно вывести из цикла запрос / ответ, запустив задачу для создания других задач:
from celery.task import TaskSet, task
from myapp.models import MyModel
@task
def process_object(pk):
obj = MyModel.objects.get(pk)
# do something with obj
@task
def process_lots_of_items(ids_to_process):
return TaskSet(process_object.subtask((id, ))
for id in ids_to_process).apply_async()
Кроме того, поскольку у вас, вероятно, нет 15000 процессоров для параллельной обработки всех этих объектов, вы можете разбить объекты на куски, скажем, 100 или 1000:
from itertools import islice
from celery.task import TaskSet, task
from myapp.models import MyModel
def chunks(it, n):
for first in it:
yield [first] + list(islice(it, n - 1))
@task
def process_chunk(pks):
objs = MyModel.objects.filter(pk__in=pks)
for obj in objs:
# do something with obj
@task
def process_lots_of_items(ids_to_process):
return TaskSet(process_chunk.subtask((chunk, ))
for chunk in chunks(iter(ids_to_process),
1000)).apply_async()