Я новичок в Airflow и сталкиваюсь с этой проблемой -
У меня есть два DAG-файла в двух отдельных файлах, причем второй должен запускаться после завершения работы первого. Для этого я использовал ExternalTaskSensor, чтобы проверить, запущен ли первый DAG.
DAG - 1:
spark_dag = DAG(
'SPARK_DAG',
default_args=default_dag_args,
schedule_interval=timedelta(days=1)
)
submit_spark_PROG_ACTN = BashOperator(
task_id='Submit_PROG_ACTN',
bash_command="gcloud dataproc jobs submit spark --cluster <CLUSTER_DETAILS>",
dag=spark_dag)
submit_spark_PROG_ACTN
DAG 2-
create_table_dag = DAG(
'CreateTableDAG',
default_args=default_dag_args,
schedule_interval=timedelta(days=1)
)
CreateTable = BigQueryCreateEmptyTableOperator(
task_id='BigQueryCreateEmptyTableOperator_task',
dataset_id='dataset_name',
table_id='PROG_ACTN',
project_id='maximal-muse-11256',
gcs_schema_object='<gcs_details>',
google_cloud_storage_conn_id='google_cloud_datastore_default',
dag = create_table_dag)
check_spark = ExternalTaskSensor(
task_id = 'check_spark',
external_dag_id = 'SPARK_DAG',
external_task_id = 'Submit_PROG_ACTN',
execution_delta = timedelta(minutes=10),
timeout = 300,
dag = create_table_dag)
CreateTable << check_spark
Я использую Воздушный поток в экземпляре Cloud Composer на GCP. Composer версия - composer 1.11.1 Airflow v1.10.9
Теперь, когда DAG-2 работает, он некоторое время находится в рабочем состоянии, а затем выходит из строя. Журнал -
*** Reading remote log from gs://us-central1-schedulejobs-1ecdfd20-bucket/logs/CreateTableDAG/check_spark/2020-07-29T00:00:00+00:00/1.log.
[2020-07-30 12:03:38,864] {taskinstance.py:656} INFO - Dependencies all met for <TaskInstance: CreateTableDAG.check_spark 2020-07-29T00:00:00+00:00 [queued]>
[2020-07-30 12:03:38,905] {taskinstance.py:656} INFO - Dependencies all met for <TaskInstance: CreateTableDAG.check_spark 2020-07-29T00:00:00+00:00 [queued]>
[2020-07-30 12:03:38,906] {taskinstance.py:867} INFO -
--------------------------------------------------------------------------------
[2020-07-30 12:03:38,906] {taskinstance.py:868} INFO - Starting attempt 1 of 4
[2020-07-30 12:03:38,906] {taskinstance.py:869} INFO -
--------------------------------------------------------------------------------
[2020-07-30 12:03:38,943] {taskinstance.py:888} INFO - Executing <Task(ExternalTaskSensor): check_spark> on 2020-07-29T00:00:00+00:00
[2020-07-30 12:03:38,944] {base_task_runner.py:131} INFO - Running on host: airflow-worker-5557648985-67qrv
[2020-07-30 12:03:38,944] {base_task_runner.py:132} INFO - Running: ['airflow', 'run', 'CreateTableDAG', 'check_spark', '2020-07-29T00:00:00+00:00', '--job_id', '433', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/CreateTableDAG.py', '--cfg_path', '/tmp/tmp4zjj44qs']
[2020-07-30 12:03:40,939] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:40,939] {settings.py:255} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=570, pid=5986
[2020-07-30 12:03:41,549] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,548] {app.py:55} WARNING - Can't load Environment Variable overrides.
[2020-07-30 12:03:41,549] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark Traceback (most recent call last):
[2020-07-30 12:03:41,550] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark File "/usr/local/lib/airflow/airflow/www/app.py", line 50, in <module>
[2020-07-30 12:03:41,550] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark with open('/home/airflow/gcs/env_var.json', 'r') as env_var_json:
[2020-07-30 12:03:41,550] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark FileNotFoundError: [Errno 2] No such file or directory: '/home/airflow/gcs/env_var.json'
[2020-07-30 12:03:41,550] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,549] {app.py:56} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-07-30 12:03:41,558] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,557] {configuration.py:618} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-07-30 12:03:41,575] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,575] {settings.py:255} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=570, pid=5986
[2020-07-30 12:03:41,966] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,965] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluster.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-07-30 12:03:41,967] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,967] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-07-30 12:03:41,968] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,967] {dagbag.py:401} INFO - Filling up the DagBag from /home/airflow/gcs/dags/CreateTableDAG.py
[2020-07-30 12:03:41,989] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark /usr/local/lib/airflow/airflow/utils/helpers.py:438: DeprecationWarning: Importing 'BashOperator' directly from 'airflow.operators' has been deprecated. Please import from 'airflow.operators.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
[2020-07-30 12:03:41,990] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark DeprecationWarning)
[2020-07-30 12:03:41,993] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark /usr/local/lib/airflow/airflow/utils/helpers.py:438: DeprecationWarning: Importing 'PythonOperator' directly from 'airflow.operators' has been deprecated. Please import from 'airflow.operators.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
[2020-07-30 12:03:41,993] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark DeprecationWarning)
[2020-07-30 12:03:42,480] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark Running <TaskInstance: CreateTableDAG.check_spark 2020-07-29T00:00:00+00:00 [running]> on host airflow-worker-5557648985-67qrv
[2020-07-30 12:03:42,542] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:42,542] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ...
[2020-07-30 12:04:42,622] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:04:42,622] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ...
[2020-07-30 12:05:42,686] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:05:42,686] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ...
[2020-07-30 12:06:42,774] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:06:42,774] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ...
[2020-07-30 12:07:42,845] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:07:42,845] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ...
[2020-07-30 12:08:42,937] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:08:42,936] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ...
[2020-07-30 12:08:42,989] {taskinstance.py:1135} ERROR - Snap. Time is OUT.
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 972, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/sensors/base_sensor_operator.py", line 116, in execut
raise AirflowSensorTimeout('Snap. Time is OUT.'
airflow.exceptions.AirflowSensorTimeout: Snap. Time is OUT
[2020-07-30 12:08:42,991] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:08:42,989] {taskinstance.py:1135} ERROR - Snap. Time is OUT.
[2020-07-30 12:08:42,991] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark Traceback (most recent call last):
[2020-07-30 12:08:42,992] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 972, in _run_raw_task
[2020-07-30 12:08:42,992] {taskinstance.py:1158} INFO - Marking task as UP_FOR_RETRY
[2020-07-30 12:08:42,992] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark result = task_copy.execute(context=context)
[2020-07-30 12:08:42,992] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark File "/usr/local/lib/airflow/airflow/sensors/base_sensor_operator.py", line 116, in execute
[2020-07-30 12:08:42,993] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark raise AirflowSensorTimeout('Snap. Time is OUT.')
[2020-07-30 12:08:42,993] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark airflow.exceptions.AirflowSensorTimeout: Snap. Time is OUT.
[2020-07-30 12:08:42,993] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:08:42,992] {taskinstance.py:1158} INFO - Marking task as UP_FOR_RETRY
[2020-07-30 12:08:43,030] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark Traceback (most recent call last):
[2020-07-30 12:08:43,030] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark File "/usr/local/bin/airflow", line 7, in <module>
[2020-07-30 12:08:43,031] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark exec(compile(f.read(), __file__, 'exec'))
[2020-07-30 12:08:43,031] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark File "/usr/local/lib/airflow/airflow/bin/airflow", line 37, in <module>
[2020-07-30 12:08:43,031] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark args.func(args)
[2020-07-30 12:08:43,032] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark File "/usr/local/lib/airflow/airflow/utils/cli.py", line 75, in wrapper
[2020-07-30 12:08:43,032] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark return f(*args, **kwargs)
[2020-07-30 12:08:43,032] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark File "/usr/local/lib/airflow/airflow/bin/cli.py", line 545, in run
[2020-07-30 12:08:43,032] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark _run(args, dag, ti)
[2020-07-30 12:08:43,033] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark File "/usr/local/lib/airflow/airflow/bin/cli.py", line 465, in _run
[2020-07-30 12:08:43,033] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark pool=args.pool,
[2020-07-30 12:08:43,033] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrapper
[2020-07-30 12:08:43,034] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark return func(*args, **kwargs)
[2020-07-30 12:08:43,034] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 972, in _run_raw_task
[2020-07-30 12:08:43,034] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark result = task_copy.execute(context=context)
[2020-07-30 12:08:43,034] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark File "/usr/local/lib/airflow/airflow/sensors/base_sensor_operator.py", line 116, in execute
[2020-07-30 12:08:43,034] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark raise AirflowSensorTimeout('Snap. Time is OUT.')
[2020-07-30 12:08:43,034] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark airflow.exceptions.AirflowSensorTimeout: Snap. Time is OUT.
Я не могу понять, почему DAG не работает. В нем что-то говорится о том, что env_var. json не найден, но я не использую его в своем коде. Мне что-то там не хватает, или предупреждения являются источником проблемы?