У меня есть следующий DAG, определенный в коде:
from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.operators.docker_operator import DockerOperator
from airflow.contrib.operators.ecs_operator import ECSOperator
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2018, 9, 24, 10, 00, 00)
}
dag = DAG(
'data-push',
default_args=default_args,
schedule_interval='0 0 * * 1,4',
)
colors = ['blue', 'red', 'yellow']
for color in colors:
ECSOperator(dag=dag,
task_id='data-push-for-%s' % (color),
task_definition= 'generic-push-colors',
cluster= 'MY_ECS_CLUSTER_ARN',
launch_type= 'FARGATE',
overrides={
'containerOverrides': [
{
'name': 'push-colors-container',
'command': [color]
}
]
},
region_name='us-east-1',
network_configuration={
'awsvpcConfiguration': {
'securityGroups': ['MY_SG'],
'subnets': ['MY_SUBNET'],
'assignPublicIp': "ENABLED"
}
},
)
Это должно создать DAG с 3 задачами, по одной для каждого цвета в моем списке цветов.
Это кажется хорошим, когда Я запускаю:
airflow list_dags
Я вижу свой даг в списке:
data-push
И когда я запускаю:
airflow list_tasks data-push
Я вижу, что мои три задачи отображаются так, как должны :
data-push-for-blue
data-push-for-red
data-push-for-yellow
Затем я тестирую запуск одной из моих задач, вводя в терминал следующее:
airflow run data-push data-push-for-blue 2017-1-23
И это запускает задачу, которая появляется в моем кластере ECS на панель управления aws, поэтому я точно знаю, что задача выполняется в моем кластере ECS, и данные успешно передаются, и все в порядке.
Теперь, когда я пытаюсь запустить DAG data-pu sh из Пользовательский интерфейс Airflow - это то место, где я столкнулся с проблемой.
Я запускаю:
airflow initdb
, затем:
airflow webserver
и теперь go в пользовательский интерфейс воздушного потока at localhost: 8080.
Я вижу dag data-pu sh в списке dags, щелкните его, а затем, чтобы протестировать запуск всего dag, я нажимаю «Trigger DAG» кнопка. Я не добавляю конфигурацию json, а затем нажимаю «Триггер». В представлении в виде дерева для группы DAG отображается зеленый кружок справа от древовидной структуры, который, по-видимому, указывает на то, что группа DAG «работает». Но зеленый кружок остается там целую вечность, и когда я вручную проверяю свою панель управления ECS, я не вижу, чтобы задачи действительно выполнялись, поэтому после запуска DAG из пользовательского интерфейса Airflow ничего не происходит, несмотря на то, что задачи работают, когда я вручную запускаю их из интерфейса командной строки *. 1033 *
Я использую SequentialExecutor, если это имеет значение.
Мои две основные теории относительно того, почему запуск DAG ничего не делает при запуске отдельных задач из CLI, работают: возможно, мне что-то не хватает в мой код python, в котором я определяю dag (возможно, потому, что я не указываю какие-либо зависимости для задач?) или что я не запускаю планировщик воздушного потока, но если я вручную запускаю DAGS из пользовательского интерфейса Airflow, я не понимаю, почему планировщик должен быть запущен, и почему он не показывает мне ошибку, говоря, что это проблема.
Есть идеи?