Воздушный поток - запустить задачу, если любой из них не удается - PullRequest
0 голосов
/ 27 сентября 2019

Я хотел бы иметь знак, в котором, если в какой-либо задаче возникает ошибка, он должен автоматически выполнить задачу, чтобы «сбросить» таблицу и завершить процесс.Пример:

#Task that needs to be performed if any of the above fails
drop_bq_op = BigQueryOperator(
    task_id='drop_bq',
    use_legacy_sql=False,
    allow_large_results=True,
    bql="""DELETE FROM dataset.table1 WHERE ID IS NOT NULL""",
    bigquery_conn_id='gcp',
    dag=dag)

#task1
MsSql = MsSqlToGoogleCloudStorageOperator(
    task_id='import',
    mssql_conn_id=mssql,
    google_cloud_storage_conn_id='gcp',
    sql=sql_query,
    bucket=nm_bucket,
    filename=nm_arquivo,
    schema_filename=sc_arquivo,
    dag=dag)

#task2
Google = GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq',
    bucket='bucket',
    source_objects=[nm_arquivo],
    destination_project_dataset_table=dataset_bq_tbl,
    schema_fields=sc_tbl_bq,
    source_format='NEWLINE_DELIMITED_JSON',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition=wrt_disposition,
    time_partitioning=tp_particao,
    cluster_fields=nm_cluster,
    bigquery_conn_id='gcp',
    google_cloud_storage_conn_id='gcp',
    dag=dag
)

task_3 = BigQueryOperator(
    task_id='test3',
    use_legacy_sql=False,
    allow_large_results=True,
    bql="""select ...""",
    bigquery_conn_id='gcp',
    dag=dag)

ОБНОВЛЕНИЕ: Я включаю этот код в свой скрипт:

def delete_bigquery():
    """query bigquery to get data to import to PSQL"""
    client = bigquery.Client()
    query = "DELETE FROM dataset.table1 WHERE ID IS NOT NULL"
    dataset = client.dataset('dataset')
    table = dataset.table(name='table1')
    job_name = 'delete_{}'.format(uuid.uuid4())
    job = client.run_async_query(job_name, query)
    job.destination = table
    job.write_disposition = 'WRITE_TRUNCATE'
    job.begin()
    return job.state

cleanup_task = PythonOperator(task_id="cleanup_task",
                              python_callable=delete_bigquery,
                              trigger_rule=TriggerRule.ONE_FAILED,
                              dag=dag)

[gcs_to_bq.set_upstream(import), task_3.set_upstream(gcs_to_bq)] >> cleanup_task

И теперь эта ошибка появляется, когда я снова поднимаюсь вверх по дамбе:

Broken DAG: [dag.py] Отношения могут быть установлены только между операторами;получил NoneType

1 Ответ

0 голосов
/ 27 сентября 2019
  • Это классический случай использования TriggerRules

  • Вы можете создать свой cleanup_taskи подключите ко всем вышестоящим задачам (которые должны быть очищены ) и назначьте trigger_rule="one_failed" ему

# refer code here https://github.com/apache/airflow/blob/master/airflow/utils/trigger_rule.py#L28
from airflow.utils.trigger_rule import TriggerRule
..
cleanup_task = PythonOperator(dag_id="..",
                              task_id="cleanup_task"
                              ..
                              trigger_rule=TriggerRule.ONE_FAILED
                              ..)
..
# all tasks that must be cleaned-up should have `cleanup_task` in their downstream
[my_task_1, my_task_2, my_task_3] >> cleanup_task
...