Предполагая, что вы действительно имели в виду что-то вроде этого:
Task1 -> send to QueueOne
Task2 -> sent to QueueTwo
TaskFirehose -> send to QueueFirehose
тогда:
Worker1 -> consume from QueueOne, QueueFirehose
Worker2 -> consume from QueueTwo, QueueFirehose
WorkerFirehose -> consume from QueueFirehose
Возможно, это не совсем то, что вы имели в виду, но я думаю, что это должно охватывать многие сценарии, и, надеюсь, ваш тоже.
Примерно так должно работать:
# Advanced example starting 10 workers in the background:
# * Three of the workers processes the images and video queue
# * Two of the workers processes the data queue with loglevel DEBUG
# * the rest processes the default' queue.
$ celery multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data
-Q default -L:4,5 DEBUG
Дополнительные параметры и справка: http://celery.readthedocs.org/en/latest/reference/celery.bin.multi.html
Это было прямо из документации.
У меня тоже была похожая ситуация, и я решил ее немного по-другому. Я не мог использовать мульти сельдерея с наблюдателем.
Поэтому вместо этого я создал несколько программ в супервизоре для каждого работника. В любом случае, рабочие будут работать на разных процессах, так что просто дайте руководителю позаботиться обо всем за вас.
Файл конфигурации выглядит примерно так: -
; ==================================
; celery worker supervisor example
; ==================================
[program:Worker1]
; Set full path to celery program if using virtualenv
command=celery worker -A proj --loglevel=INFO -Q QueueOne, QueueFirehose
directory=/path/to/project
user=nobody
numprocs=1
stdout_logfile=/var/log/celery/worker1.log
stderr_logfile=/var/log/celery/worker1.log
autostart=true
autorestart=true
startsecs=10
; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 600
; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true
; if rabbitmq is supervised, set its priority higher
; so it starts first
priority=998
Аналогично, для Worker2 и WorkerFirehose отредактируйте соответствующие строки так:
[program:Worker2]
; Set full path to celery program if using virtualenv
command=celery worker -A proj --loglevel=INFO -Q QueueTwo, QueueFirehose
и
[program:WorkerFirehose]
; Set full path to celery program if using virtualenv
command=celery worker -A proj --loglevel=INFO -Q QueueFirehose
включите их все в файл supervisord.conf, и это должно быть сделано.