Мне нужно иметь возможность отмечать успех работы, даже если первая задача не удалась. Первая задача - это задача перемещения файла, и, если нет файла, произойдет сбой, но успех будет достигнут,если нет файла, задача не выполняется. Я не беспокоюсь об этом. То, что я хочу сделать, - если нет файла, тогда отметьте успех. в любом случае он снова будет работать через 24 часа.
import airflow
from airflow import models
from airflow.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
'email': ['email@gmail.com'],
'email_on_failure': True,
'email_on_success': True,
'schedule_interval': 'None',
}
dag = models.DAG(
dag_id='Data_Ingestion',
default_args=args
)
# [START load Data File to STAGING]
load_File_to_Data_RAW = GoogleCloudStorageToBigQueryOperator(
task_id='Data_GCS_to_GBQ_Raw',
bucket='Data_files',
source_objects=['To_Process/*.txt'],
destination_project_dataset_table='Data.Data_RAW',
schema_fields=[
{'name': 'datarow', 'type': 'STRING', 'mode': 'NULLABLE'},
],
field_delimiter='§',
write_disposition='WRITE_TRUNCATE',
google_cloud_storage_conn_id='GCP_Data_Staging',
bigquery_conn_id='GCP_Data_Staging',
soft_fail=True,
dag=dag)
# [END howto_operator_gcs_to_bq]
Set_Staging = BigQueryOperator(
task_id='Set_Data_Staging',
bql='Select * from `db.schema.table1`;',
bigquery_conn_id='GCP_Data_Staging',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
destination_dataset_table='db.schema.table2',
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag
)
Populate_warehouse = BigQueryOperator(
task_id='Update_warehouse_Table',
bql='./SQL/Attom/PROPERTY.sql',
bigquery_conn_id='GCP_Data_Staging',
use_legacy_sql=False,
write_disposition='WRITE_APPEND',
create_disposition='CREATE_IF_NEEDED',
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag
)
Populate_Data_Details = BigQueryOperator(
task_id='Update_warehouse_Detail_Table',
bql='./SQL/Data/warehouse_DETAIL.sql',
bigquery_conn_id='GCP_Data_Staging',
use_legacy_sql=False,
write_disposition='WRITE_APPEND',
create_disposition='CREATE_IF_NEEDED',
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag
)
# [START move files to Archive]
archive_Data_files = GoogleCloudStorageToGoogleCloudStorageOperator(
task_id='Archive_Data_Files',
source_bucket='warehouse_files',
source_object='To_Process/*.txt',
destination_bucket='warehouse_files',
destination_object='Archive/',
move_object=True,
google_cloud_storage_conn_id='GCP_Mother_Staging',
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag
)
finished = DummyOperator(
task_id='Finished',
dag=dag)
# [END move files to archive]
load_File_to_Data_RAW >> Set_Staging >> Populate_warehouse >> Populate_warehouse_Details >> archive_Data_files >> finished
load_File_to_Data_RAW >> finished