Распределенные задачи Python с несколькими очередями - PullRequest
0 голосов
/ 11 мая 2018

Таким образом, проект, над которым я работаю, требует распределенной системы задач для обработки ресурсоемких задач. Это относительно просто, раскрутите сельдерей и бросьте все задачи в очередь, а сельдерей сделает все остальное.

Проблема, с которой я столкнулся, заключается в том, что каждому пользователю нужна своя очередь, и элементы в очереди каждого пользователя должны обрабатываться синхронно. Таким образом, если в очереди пользователей уже есть задача, ожидающая ее завершения, прежде чем разрешить работнику подобрать следующую.

Самое близкое, к чему я пришел, - это фиксированный набор очередей и назначение их пользователям. Затем задачи пользователей, отобранные сельдереями, фиксируются в определенной очереди с параллелизмом 1.

Проблема с этой системой заключается в том, что я не могу масштабировать своих работников для обработки невыполненных заданий пользователя.

Есть ли способ настроить сельдерей на то, что я хочу, или, возможно, существует другая система задач, которая делает то, что я хочу?

Edit:

В настоящее время я использую следующую команду, чтобы порождать моих работников из сельдерея с параллелизмом в фиксированный набор очередей

celery multi start 4 -A app.celery -Q:1 queue_1 -Q:2 queue_2 -Q:3 queue_3 -Q:4 queue_4 --logfile=celery.log --concurrency=1

Затем я сохраняю имя очереди в пользовательском объекте, и когда пользователь запускает процесс, я ставлю задачу в очередь, сохраненную в пользовательском объекте. Это дает мне мои синхронные задачи.

Недостатком является ситуация, когда несколько пользователей совместно используют очереди, что приводит к созданию задач и их обработке никогда не будет.

Мне бы хотелось, чтобы было 5 рабочих и очередь на пользовательский объект. Затем пусть рабочие просто перепрыгивают через очереди, но никогда не имеют более одного работника в одной очереди за раз.

1 Ответ

0 голосов
/ 11 мая 2018

Я использую chain документ здесь условие для выполнения задачи в определенном порядке:

chain = task1_task.si(account_pk) | task2_task.si(account_pk) | task3_task.si(account_pk)
chain()

Итак, я выполняю задание для конкретного пользователя1, когда оно закончено, я выполняю задачу2 икогда закончите, выполните задачу3.Он появится у любого работника:)

Для остановки цепи на полпути:

self.request.callbacks = None
return

И не забудьте связать свою задачу:

@app.task(bind=True)
def task2_task(self, account_pk):
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...