В настоящее время я использую CeleryExecutor для отправки групп доступности баз данных и запускаю веб-сервер, планировщик и работника в одном контейнере на AWS. Когда я отправляю группы DAG для запуска, и их задачи находятся в очереди Redis (с использованием Elasticache), рабочие из Celery выполняют задачи, но имя просто airflow.executors.celery_executor.execute_command
. Как я могу получить эти аргументы? ![enter image description here](https://i.stack.imgur.com/KssmW.png)
В настоящее время это важные разделы airflow.cfg
:
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
executor = CeleryExecutor
[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above
# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor
И в нашем файле DAG:
dag = DAG(
'foo_dag',
schedule_interval='0 2-23 * * *',
catchup=False,
default_args=default_args
)
t1 = PythonOperator(
task_id="bar_task",
provide_context=True,
python_callable=bar_task,
op_kwargs={"emr_cluster_id": emr_cluster_id},
task_concurrency=1,
dag=foo_dag
)