Airflow + Celery + Flower - Имя задачи не является идентификатором задачи для оператора Python - PullRequest
0 голосов
/ 17 марта 2019

В настоящее время я использую CeleryExecutor для отправки групп доступности баз данных и запускаю веб-сервер, планировщик и работника в одном контейнере на AWS. Когда я отправляю группы DAG для запуска, и их задачи находятся в очереди Redis (с использованием Elasticache), рабочие из Celery выполняют задачи, но имя просто airflow.executors.celery_executor.execute_command. Как я могу получить эти аргументы? enter image description here

В настоящее время это важные разделы airflow.cfg:

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
executor = CeleryExecutor

[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above

# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

И в нашем файле DAG:

dag = DAG(
    'foo_dag',
    schedule_interval='0 2-23 * * *',
    catchup=False,
    default_args=default_args
)

t1 = PythonOperator(
    task_id="bar_task",
    provide_context=True,
    python_callable=bar_task,
    op_kwargs={"emr_cluster_id": emr_cluster_id},
    task_concurrency=1,
    dag=foo_dag
)
...