DAG, использующий запуск ExternalTaskSensor, показывает ошибку «env_var. json not found» - PullRequest
1 голос
/ 03 августа 2020

Я новичок в Airflow и сталкиваюсь с этой проблемой -

У меня есть два DAG-файла в двух отдельных файлах, причем второй должен запускаться после завершения работы первого. Для этого я использовал ExternalTaskSensor, чтобы проверить, запущен ли первый DAG.

DAG - 1:

spark_dag = DAG(
    'SPARK_DAG',
        default_args=default_dag_args,
        schedule_interval=timedelta(days=1)
)

submit_spark_PROG_ACTN = BashOperator(
        task_id='Submit_PROG_ACTN',
        bash_command="gcloud dataproc jobs submit spark --cluster <CLUSTER_DETAILS>",
        dag=spark_dag)
        
submit_spark_PROG_ACTN

DAG 2-

create_table_dag = DAG(
        'CreateTableDAG',
        default_args=default_dag_args,
        schedule_interval=timedelta(days=1)
)

CreateTable = BigQueryCreateEmptyTableOperator(
                task_id='BigQueryCreateEmptyTableOperator_task',
                dataset_id='dataset_name',
                table_id='PROG_ACTN',
                project_id='maximal-muse-11256',
                gcs_schema_object='<gcs_details>',
                google_cloud_storage_conn_id='google_cloud_datastore_default',
                dag = create_table_dag) 
                
check_spark = ExternalTaskSensor(
                            task_id = 'check_spark', 
                            external_dag_id = 'SPARK_DAG',
                            external_task_id = 'Submit_PROG_ACTN',
                            execution_delta = timedelta(minutes=10),
                            timeout = 300,
                            dag = create_table_dag)
                            
CreateTable << check_spark

Я использую Воздушный поток в экземпляре Cloud Composer на GCP. Composer версия - composer 1.11.1 Airflow v1.10.9

Теперь, когда DAG-2 работает, он некоторое время находится в рабочем состоянии, а затем выходит из строя. Журнал -

*** Reading remote log from gs://us-central1-schedulejobs-1ecdfd20-bucket/logs/CreateTableDAG/check_spark/2020-07-29T00:00:00+00:00/1.log.
[2020-07-30 12:03:38,864] {taskinstance.py:656} INFO - Dependencies all met for <TaskInstance: CreateTableDAG.check_spark 2020-07-29T00:00:00+00:00 [queued]>
[2020-07-30 12:03:38,905] {taskinstance.py:656} INFO - Dependencies all met for <TaskInstance: CreateTableDAG.check_spark 2020-07-29T00:00:00+00:00 [queued]>
[2020-07-30 12:03:38,906] {taskinstance.py:867} INFO -
--------------------------------------------------------------------------------
[2020-07-30 12:03:38,906] {taskinstance.py:868} INFO - Starting attempt 1 of 4
[2020-07-30 12:03:38,906] {taskinstance.py:869} INFO -
--------------------------------------------------------------------------------
[2020-07-30 12:03:38,943] {taskinstance.py:888} INFO - Executing <Task(ExternalTaskSensor): check_spark> on 2020-07-29T00:00:00+00:00
[2020-07-30 12:03:38,944] {base_task_runner.py:131} INFO - Running on host: airflow-worker-5557648985-67qrv
[2020-07-30 12:03:38,944] {base_task_runner.py:132} INFO - Running: ['airflow', 'run', 'CreateTableDAG', 'check_spark', '2020-07-29T00:00:00+00:00', '--job_id', '433', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/CreateTableDAG.py', '--cfg_path', '/tmp/tmp4zjj44qs']
[2020-07-30 12:03:40,939] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:40,939] {settings.py:255} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=570, pid=5986
[2020-07-30 12:03:41,549] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,548] {app.py:55} WARNING - Can't load Environment Variable overrides.
[2020-07-30 12:03:41,549] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark Traceback (most recent call last):
[2020-07-30 12:03:41,550] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/www/app.py", line 50, in <module>
[2020-07-30 12:03:41,550] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     with open('/home/airflow/gcs/env_var.json', 'r') as env_var_json:
[2020-07-30 12:03:41,550] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark FileNotFoundError: [Errno 2] No such file or directory: '/home/airflow/gcs/env_var.json'
[2020-07-30 12:03:41,550] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,549] {app.py:56} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-07-30 12:03:41,558] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,557] {configuration.py:618} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-07-30 12:03:41,575] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,575] {settings.py:255} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=570, pid=5986
[2020-07-30 12:03:41,966] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,965] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluster.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-07-30 12:03:41,967] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,967] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-07-30 12:03:41,968] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:41,967] {dagbag.py:401} INFO - Filling up the DagBag from /home/airflow/gcs/dags/CreateTableDAG.py
[2020-07-30 12:03:41,989] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark /usr/local/lib/airflow/airflow/utils/helpers.py:438: DeprecationWarning: Importing 'BashOperator' directly from 'airflow.operators' has been deprecated. Please import from 'airflow.operators.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
[2020-07-30 12:03:41,990] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   DeprecationWarning)
[2020-07-30 12:03:41,993] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark /usr/local/lib/airflow/airflow/utils/helpers.py:438: DeprecationWarning: Importing 'PythonOperator' directly from 'airflow.operators' has been deprecated. Please import from 'airflow.operators.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
[2020-07-30 12:03:41,993] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   DeprecationWarning)
[2020-07-30 12:03:42,480] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark Running <TaskInstance: CreateTableDAG.check_spark 2020-07-29T00:00:00+00:00 [running]> on host airflow-worker-5557648985-67qrv
[2020-07-30 12:03:42,542] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:42,542] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ... 
[2020-07-30 12:04:42,622] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:04:42,622] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ... 
[2020-07-30 12:05:42,686] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:05:42,686] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ... 
[2020-07-30 12:06:42,774] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:06:42,774] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ... 
[2020-07-30 12:07:42,845] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:07:42,845] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ... 
[2020-07-30 12:08:42,937] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:08:42,936] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ... 
[2020-07-30 12:08:42,989] {taskinstance.py:1135} ERROR - Snap. Time is OUT.
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 972, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/sensors/base_sensor_operator.py", line 116, in execut
    raise AirflowSensorTimeout('Snap. Time is OUT.'
airflow.exceptions.AirflowSensorTimeout: Snap. Time is OUT
[2020-07-30 12:08:42,991] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:08:42,989] {taskinstance.py:1135} ERROR - Snap. Time is OUT.
[2020-07-30 12:08:42,991] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark Traceback (most recent call last):
[2020-07-30 12:08:42,992] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 972, in _run_raw_task
[2020-07-30 12:08:42,992] {taskinstance.py:1158} INFO - Marking task as UP_FOR_RETRY
[2020-07-30 12:08:42,992] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     result = task_copy.execute(context=context)
[2020-07-30 12:08:42,992] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/sensors/base_sensor_operator.py", line 116, in execute
[2020-07-30 12:08:42,993] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     raise AirflowSensorTimeout('Snap. Time is OUT.')
[2020-07-30 12:08:42,993] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark airflow.exceptions.AirflowSensorTimeout: Snap. Time is OUT.
[2020-07-30 12:08:42,993] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:08:42,992] {taskinstance.py:1158} INFO - Marking task as UP_FOR_RETRY
[2020-07-30 12:08:43,030] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark Traceback (most recent call last):
[2020-07-30 12:08:43,030] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/bin/airflow", line 7, in <module>
[2020-07-30 12:08:43,031] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     exec(compile(f.read(), __file__, 'exec'))
[2020-07-30 12:08:43,031] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/bin/airflow", line 37, in <module>
[2020-07-30 12:08:43,031] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     args.func(args)
[2020-07-30 12:08:43,032] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/utils/cli.py", line 75, in wrapper
[2020-07-30 12:08:43,032] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     return f(*args, **kwargs)
[2020-07-30 12:08:43,032] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/bin/cli.py", line 545, in run
[2020-07-30 12:08:43,032] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     _run(args, dag, ti)
[2020-07-30 12:08:43,033] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/bin/cli.py", line 465, in _run
[2020-07-30 12:08:43,033] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     pool=args.pool,
[2020-07-30 12:08:43,033] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrapper
[2020-07-30 12:08:43,034] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     return func(*args, **kwargs)
[2020-07-30 12:08:43,034] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 972, in _run_raw_task
[2020-07-30 12:08:43,034] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     result = task_copy.execute(context=context)
[2020-07-30 12:08:43,034] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark   File "/usr/local/lib/airflow/airflow/sensors/base_sensor_operator.py", line 116, in execute
[2020-07-30 12:08:43,034] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark     raise AirflowSensorTimeout('Snap. Time is OUT.')
[2020-07-30 12:08:43,034] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark airflow.exceptions.AirflowSensorTimeout: Snap. Time is OUT.

Я не могу понять, почему DAG не работает. В нем что-то говорится о том, что env_var. json не найден, но я не использую его в своем коде. Мне что-то там не хватает, или предупреждения являются источником проблемы?

1 Ответ

0 голосов
/ 03 августа 2020

В вашем датчике вы указали параметр timeout=300:

check_spark = ExternalTaskSensor(
                            task_id = 'check_spark', 
                            external_dag_id = 'SPARK_DAG',
                            external_task_id = 'Submit_PROG_ACTN',
                            execution_delta = timedelta(minutes=10),
                            timeout = 300,
                            dag = create_table_dag)

Этот параметр задается в секундах, поэтому он не прошел через 5 минут:

[2020-07-30 12:03:42,542] {base_task_runner.py:113} INFO - Job 433: Subtask check_spark [2020-07-30 12:03:42,542] {external_task_sensor.py:117} INFO - Poking for SPARK_DAG.submit_spark_PROG_ACTN on 2020-07-28T23:50:00+00:00 ...
...
[2020-07-30 12:08:42,989] {taskinstance.py:1135} ERROR - Snap. Time is OUT.

Рекомендуется для увеличения тайм-аута для выполнения задачи в группе DAG 1.

...