DAG работает успешно, но в пользовательском интерфейсе Airflow Webserver DAG недоступен / DAG не активируется в Google Cloud Composer - PullRequest
1 голос
/ 28 марта 2019

Ниже приведен код DAG воздушного потока.Он отлично работает как при локальном размещении воздушного потока, так и в облачном компоновщике.Однако сам DAG не активируется в пользовательском интерфейсе Composer.Я нашел похожий вопрос и попробовал принятый ответ, связанный с этим вопросом .Моя проблема похожа.

import airflow
from airflow import DAG

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mysql_operator import MySqlOperator
from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator
from airflow.contrib.operators.dataproc_operator import DataprocClusterDeleteOperator
from airflow.contrib.operators.dataproc_operator import DataProcSparkOperator

from datetime import datetime, timedelta
import sys

#copy this package to dag directory in GCP composer bucket
from schemas.schemaValidator import loadSchema
from schemas.schemaValidator import sparkArgListToMap

#change these paths to point to GCP Composer data directory

## cluster config
clusterConfig= loadSchema("somePath/jobConfig/cluster.yaml","cluster")

##per job yaml config
autoLoanCsvToParquetConfig= loadSchema("somePath/jobConfig/job.yaml","job")

default_args= {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2019, 1, 1),
        'retries': 1,
        'retry_delay': timedelta(minutes=3)
}

dag= DAG('usr_job', default_args=default_args, schedule_interval=None)

t1= DummyOperator(task_id= "start", dag=dag)

t2= DataprocClusterCreateOperator(
    task_id= "CreateCluster",
    cluster_name= clusterConfig["cluster"]["cluster_name"],
    project_id= clusterConfig["project_id"],
    num_workers= clusterConfig["cluster"]["worker_config"]["num_instances"],
    image_version= clusterConfig["cluster"]["dataproc_img"],
    master_machine_type= clusterConfig["cluster"]["worker_config"]["machine_type"],
    worker_machine_type= clusterConfig["cluster"]["worker_config"]["machine_type"],
    zone= clusterConfig["region"],
    dag=dag
)

t3= DataProcSparkOperator(
    task_id= "csvToParquet",
    main_class= autoLoanCsvToParquetConfig["job"]["main_class"],
    arguments= autoLoanCsvToParquetConfig["job"]["args"],
    cluster_name= clusterConfig["cluster"]["cluster_name"],
    dataproc_spark_jars= autoLoanCsvToParquetConfig["job"]["jarPath"],
    dataproc_spark_properties= sparkArgListToMap(autoLoanCsvToParquetConfig["spark_params"]),
    dag=dag
)

t4= DataprocClusterDeleteOperator(
    task_id= "deleteCluster",
    cluster_name= clusterConfig["cluster"]["cluster_name"],
    project_id= clusterConfig["project_id"],
    dag= dag
)

t5= DummyOperator(task_id= "stop", dag=dag)

t1>>t2>>t3>>t4>>t5

Пользовательский интерфейс выдает эту ошибку - "This DAG isn't available in the webserver DAG bag object. It shows up in this list because the scheduler marked it as active in the metadata database. "

И все же, когда я вручную запустил DAG на Composer, я обнаружил, что он успешно прошел через файлы журнала.

1 Ответ

0 голосов
/ 29 апреля 2019

Проблема была с path, который был предоставлен для получения файлов конфигурации.Я указывал путь к папке data в GCS.Согласно документации Google, только папка dags синхронизируется со всеми узлами, а не папка data.

Само собой разумеется, что это была проблема, возникшая во время синтаксического анализа, поэтому она не появляласьправильно на пользовательском интерфейсе.Что еще более интересно, эти сообщения отладки не были выставлены на Composer 1.5 и более ранних.Теперь они доступны конечному пользователю, чтобы помочь в отладке.В любом случае, спасибо всем, кто помог.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...