У меня есть 4 источника, которые я загружаю в dwh и использую поток воздуха для его организации. Чтобы автоматически сгенерировать этот DAG, я основываюсь на списке функций ETL и использую DummyOperators для указания зависимостей между функциями. Я перебираю для l oop список функций и собираю DAG. Для 3 из 4 источников последний манекен дает мне следующую ошибку:
[2020-01-08 10: 01: 19,818] { init .py: 51} ИНФОРМАЦИЯ - Использование executor LocalExecutor [2020-01-08 10: 01: 19,819] {dagbag.py:92} INFO - Заполнение DagBag из /vmm/scripts/airflow/dags/initwaterdwh.py Traceback (последний вызов был последним): Файл "/ usr / local / bin / airflow", строка 32, в args.fun c (args) Файл "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils /cli.py ", строка 74, в оболочке возвращает файл f (* args, ** kwargs)" /home/airflow/.local/lib/python3.6/site-packages/airflow/bin/cli.py ", строка 538, в файле run task = dag.get_task (task_id = args.task_id)" /home/airflow/.local/lib/python3.6/site-packages/airflow/models/dag.py ", строка 1115 в get_task вызывает AirflowException («Task {task_id} не найден» .format (task_id = task_id)) airflow.exceptions.AirflowException: Task gdb_hub_done не найден
Предыдущие операторы-пустышки в этом же источнике выполнить правильно, как показано в age.
Мой код для создания этой группы обеспечения доступности баз данных выглядит следующим образом:
for x, shortname in enumerate(src_sn):
mtdschema = 'test_' + shortname + '_run_info'
src = src_bk[x]
previousDummy = DummyOperator(
task_id=src
)
updateRunInfo = PostgresOperator(
task_id='update_loadcycle_' + src,
sql="",
postgres_conn_id=conn_id,
database=targetdb
)
pdresSource = pdres[pdres["source"] == src]
layerNrs = sorted(set(pdresSource["layerNr"]))
insertNachtverwerkingStart >> updateRunInfo
updateHistoryStart = PostgresOperator(
task_id='update_loadhistory_' + src + '_start',
sql="",
postgres_conn_id=conn_id,
database=targetdb
)
previousDummy << updateHistoryStart
updateRunInfo >> updateHistoryStart
for layerNr in layerNrs:
currentLayerName = set(pdresSource[pdresSource["layerNr"] == layerNr]["layerName"]).pop()
currentDummy = DummyOperator(
task_id= src + "_" + currentLayerName + "_done"
)
for fileName in pdresSource[pdresSource["layerNr"] == layerNr]["reducedPath"]:
t = PostgresOperator(
task_id=fileName,
sql='',
postgres_conn_id=conn_id,
database=targetdb
)
t << previousDummy
t >> currentDummy
previousDummy = currentDummy
previousLayerNr = layerNr
updateHistoryStopSuccess = PostgresOperator(
task_id='update_loadhistory_' + src + '_success',
sql=""
postgres_conn_id=conn_id,
database=targetdb
)
updateHistoryStopFailure = PostgresOperator(
task_id='update_loadhistory_' + src + '_failure',
trigger_rule='one_failed',
sql="",
postgres_conn_id=conn_id,
database=targetdb
)
insertNachtverwerkingStopSuccess = PostgresOperator(
task_id='insert_nachtverwerking_success',
sql="",
postgres_conn_id='',
database=''
)
insertNachtverwerkingStopFailure = PostgresOperator(
task_id='insert_nachtverwerking_failure',
trigger_rule='one_failed',
sql='',
postgres_conn_id='',
database=''
)
previousDummy >> updateHistoryStopSuccess
previousDummy >> updateHistoryStopFailure
updateHistoryStopSuccess >> insertNachtverwerkingStopFailure
updateHistoryStopSuccess >> insertNachtverwerkingStopSuccess
insertNachtverwerkingStopSuccess >> datastageBV
Я попытался удалить группу обеспечения доступности баз данных и удалить метаданные через поток воздуха GUI и экземпляры задачи. потому что эти пустышки появляются в метаданных. Я немного застрял на том, почему именно это происходит.