Остановить обработку сельдерея задачами ИЛИ отслеживать «неиспользованные» задачи в Flower - PullRequest
0 голосов
/ 10 мая 2018

Предположим, что все мои задачи в очереди сельдерея работают с сторонним API.Тем не менее, API имеет ограничение по скорости, которое я отслеживаю (есть дневной и часовой ограничения, которые я должен соблюдать).Как только я достигну ограничения скорости, я хочу приостановить потребление новых задач, а затем возобновить, когда узнаю, что у меня все хорошо.

Я достиг этого, используя следующие две задачи:

@celery.task()
def cancel_api_queue(minutes_to_resume):
    resume_api_queue.apply_async(countdown=minutes_to_resume*60, queue='celery')
    celery.control.cancel_consumer('third_party', reply=True)

@celery.task(default_retry_delay=300, max_retries=5)
def resume_api_queue():
    celery.control.add_consumer('third_party', destination=['y@local'])

Затем я могу продолжать отправлять свои сторонние задачи API, и как только мой потребитель будет добавлен обратно, все мои задачи будут использованы.Отлично.

Однако, поскольку у меня нет получателя в этой очереди, этот кажется , что означает, что я больше не могу видеть задания, которые отправляются в Flower (домой потребитель добавляется).

Есть ли что-то, что я делаю не так?Могу ли я добиться этой «паузы» другим способом, позволяющим мне продолжать видеть отправленные задания в цветке?

ps, возможно, это связано с этой проблемой, но не уверен на 100%: https://github.com/celery/celery/issues/1452

Я использую amqp broker, если это имеет значение.

спасибо девочкам и мальчикам.

1 Ответ

0 голосов
/ 24 мая 2018

Я подозреваю, что просмотр содержимого сообщений очереди до того, как работник их заберет, на самом деле не является частью задуманного Цветком дизайна.Поэтому, если вы перестанете использовать задачи из очереди, лучшее, что может сделать Flower, - показать, сколько из них было помещено в очередь как одно число на панели «Брокер».

Один хакерский способ наблюдать за внутренними объектамииз входящих задач может быть добавление промежуточной фиктивной задачи «пересылки», которая просто пересылает сообщение из одной очереди (назовем это query_inbox) в другую (скажем, query_processing).

Напримерчто-то вроде:

@celery.task(queue='query_inbox')
def query(params):
    process_query.delay(params)

@celery.task(queue='query_processing')
def process_query(params):
    ... do rate-limited stuff ...

Теперь вы можете прекратить потребление задач с query_processing, но вы все равно сможете наблюдать их параметры, когда они проходят через query_inbox рабочий.

...