Бекэнд Celery Redis - делает задачи в очереди существующими как элементы - PullRequest
0 голосов
/ 05 февраля 2019

Текущая настройка: сельдерей, работающий на док-контейнерах (с кодом нашего продукта) на узле EC2, задачи создания и обработки.Наш бэкэнд / брокер - Redis, работающий в эластичной боли AWS.

Цель: возможность видеть размер очереди в любой момент времени (аналогично мониторингу цветка), надеюсь, через AWS CloudWatch, но не требуется.Содержание задач не уместно, так как я знаком с созданием резервной копии экземпляра redis и могу анализировать резервную копию с помощью локальных инструментов для выполнения любого необходимого анализа.Краткосрочные исторические данные очень предпочтительны (CloudWatch возвращается за 2 недели и имеет гранулярность точек данных за 1 мин, это очень хорошо).

Исходя из того, что я знаю, что Flower работает, Flower не будет выполнимымиспользовать из-за количества групп безопасности / ограничений, которые у нас есть на данный момент.Кроме того, flower отслеживает только то, что вы находитесь на странице, поэтому исторические данные не сохраняются.

В Elasticache уже есть встроенный CloudWatch для количества элементов в redis.Это кажется мне лучшим путем для достижения цели.Однако в настоящее время очередь представляет один элемент в Redis (независимо от того, сколько задач находится в очереди).Вот пример резервной копии redis, проанализированной в json:

[{
"1.api_data__cached_api_route.000":"{\"i1\": 0, \"i2\": 1, \"i3\": 0}",
"1.api_data__cached_api_route.001":"{\"i1\": 0, \"i2\": 0, \"i3\": 0}",
"1.api_data__cached_api_route.002":"{\"i1\": 1, \"i2\": 1, \"i3\": 0}",
"staging_event_default":["{\"id\":\"b28b056c-1268-4577-af8a-9f1948860502\", \"task\":{...}}, "{\"id\":\"52668c46-3972-457a-be3a-6e27eedd26e3\", \"task\":{...}}]
}]

Cloudwatch видит это как 4 элемента, 3 кэшированных API-маршрута и 1 очередь.Даже если в очереди будет тысячи элементов, она все равно будет отображаться как 4 элемента.Расхождение между # (элементы в очереди) и # (элементы в очереди И другими кэшированными элементами) является хорошим, так как этот инструмент мониторинга будет в основном использоваться для определения того, будет ли очередь подвергаться ужасному резервному копированию, а размер очереди будет карликовымколичество кэшированных маршрутов API.

Чтобы продолжить по этому маршруту, самый простой ответ был бы, если у сельдерея есть опция конфигурации, позволяющая сделать каждый элемент в очереди своим собственным элементом redis.Если для этого есть простое исправление или опция конфигурации, кажется, это проще всего реализовать.Вот наши текущие параметры конфигурации сельдерея:

flask_app.config.update(
  CELERY_BROKER_URL=redis_host,
  CELERY_RESULT_BACKEND=redis_host,
  CELERY_QUEUES=queue_manager.queues,
  CELERY_ROUTES=queue_manager.routes,
  CELERY_DEFAULT_QUEUE=queue_manager.default_queue_name,
  CELERY_DEFAULT_EXCHANGE=queue_manager.default_exchange_name)

_celery = celery.Celery(flask_app.import_name,
  backend=flask_app.config['CELERY_RESULT_BACKEND'],
  broker=flask_app.config['CELERY_BROKER_URL'])

opts = {
  'broker_url': redis_host,
  'result_backed': redis_host,
  'task_queues': queue_manager.queues,
  'task_routes': queue_manager.routes,
  'task_default_queue': queue_manager.default_queue_name,
  'task_default_exchange': queue_manager.default_exchange_name,
  'worker_send_task_events': True,

  'task_ignore_result': True,
  'task_store_errors_even_if_ignored': True,
  'result_expires': 3600,

  'worker_redirect_stdouts': False,
  'beat_scheduler': redbeat.RedBeatScheduler,
  'beat_max_loop_interval': 5
}
_celery.conf.update(opts)

Другой вариант, с которым я столкнулся, это celery-cloudwatch-logs , который, кажется, соответствует тому, что я пытаюсьчтобы достичь, однако, кажется, больше нацелено на то, чтобы увидеть конкретное содержание каждой задачи, а не в совокупности (как бы там я ни ошибался).

Если не существует идеального / простого решения, которое соответствует цели,Я посмотрю на разветвление / сборку сельдерея-облачных часов, чтобы загрузить только соответствующую информацию.Наша команда унаследовала большую часть кода, который существует в настоящее время, и у меня есть общее представление о том, как работает сельдерей, но отнюдь не всесторонне.

Заранее благодарен за любые мысли, комментарии и помощь!

Ответы [ 2 ]

0 голосов
/ 08 марта 2019

Я опубликую то, что я сделал здесь, если кто-то столкнется с этим.

Мы уже установили и настроили boto3 для доступа к S3 в других местах приложения, что делает его довольно простым для публикации в CloudWatch.

Я добавил метод в наш класс Celery, чтобы проверить длину очередей, используя llen из модуля redis:

 @staticmethod
  def check_lengths():
    result = {}
    for q in Celery._queues:
      result[q] = Celery._redis.llen(q)
    return result

Затем публикация в Cloudwatch также была довольно простой.:

    namespace = "Celery/Queue"
    metrics = []
    for qname, qlen in data.items():
      metric = {}
      metric["MetricName"] = "ItemsInQueue"
      metric["Dimensions"] = [ {"Name": "QueueName", "Value": qname} ]
      metric["Value"] = qlen
      metric["Unit"] = "Count"

      metrics.append(metric)

    self.cw_client.put_metric_data(Namespace=namespace, MetricData=metrics)

Затем я в конечном итоге использовал AWS Lambda для отправки сетевого запроса на минуту конечной точке, которая затем разместила вышеуказанные данные в CloudWatch.

0 голосов
/ 07 февраля 2019

Чтобы увидеть длину очереди, используя посредник Redis, просто используйте llen в Redis.например, llen celery.

...