Как отметить успех, даже если одна из задач не удалась - PullRequest
0 голосов
/ 23 октября 2019

Мне нужно иметь возможность отмечать успех работы, даже если первая задача не удалась. Первая задача - это задача перемещения файла, и, если нет файла, произойдет сбой, но успех будет достигнут,если нет файла, задача не выполняется. Я не беспокоюсь об этом. То, что я хочу сделать, - если нет файла, тогда отметьте успех. в любом случае он снова будет работать через 24 часа.

import airflow
from airflow import models
from airflow.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['email@gmail.com'],
    'email_on_failure': True,
    'email_on_success': True,
    'schedule_interval': 'None',
}

dag = models.DAG(
    dag_id='Data_Ingestion',
    default_args=args
)

# [START load Data File to STAGING]
load_File_to_Data_RAW = GoogleCloudStorageToBigQueryOperator(
    task_id='Data_GCS_to_GBQ_Raw',
    bucket='Data_files',
    source_objects=['To_Process/*.txt'],
    destination_project_dataset_table='Data.Data_RAW',
    schema_fields=[
        {'name': 'datarow', 'type': 'STRING', 'mode': 'NULLABLE'},
    ],
    field_delimiter='§',
    write_disposition='WRITE_TRUNCATE',
    google_cloud_storage_conn_id='GCP_Data_Staging',
    bigquery_conn_id='GCP_Data_Staging',
    soft_fail=True,
    dag=dag)
# [END howto_operator_gcs_to_bq]

Set_Staging = BigQueryOperator(
    task_id='Set_Data_Staging',
    bql='Select * from `db.schema.table1`;',
    bigquery_conn_id='GCP_Data_Staging',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    create_disposition='CREATE_IF_NEEDED',
    destination_dataset_table='db.schema.table2',
    trigger_rule=TriggerRule.ALL_SUCCESS,
    dag=dag
    )


Populate_warehouse = BigQueryOperator(
    task_id='Update_warehouse_Table',
    bql='./SQL/Attom/PROPERTY.sql',
    bigquery_conn_id='GCP_Data_Staging',
    use_legacy_sql=False,
    write_disposition='WRITE_APPEND',
    create_disposition='CREATE_IF_NEEDED',
    trigger_rule=TriggerRule.ALL_SUCCESS,
    dag=dag
    )


Populate_Data_Details = BigQueryOperator(
    task_id='Update_warehouse_Detail_Table',
    bql='./SQL/Data/warehouse_DETAIL.sql',
    bigquery_conn_id='GCP_Data_Staging',
    use_legacy_sql=False,
    write_disposition='WRITE_APPEND',
    create_disposition='CREATE_IF_NEEDED',
    trigger_rule=TriggerRule.ALL_SUCCESS,
    dag=dag
    )

# [START move files to Archive]
archive_Data_files = GoogleCloudStorageToGoogleCloudStorageOperator(
    task_id='Archive_Data_Files',
    source_bucket='warehouse_files',
    source_object='To_Process/*.txt',
    destination_bucket='warehouse_files',
    destination_object='Archive/',
    move_object=True,
    google_cloud_storage_conn_id='GCP_Mother_Staging',
    trigger_rule=TriggerRule.ALL_SUCCESS,
    dag=dag
)

finished = DummyOperator(
    task_id='Finished',
    dag=dag)

# [END move files to archive]

load_File_to_Data_RAW >> Set_Staging >> Populate_warehouse >> Populate_warehouse_Details >> archive_Data_files >> finished
load_File_to_Data_RAW >> finished

...