Воздушный поток дает задание не найденное исключение - PullRequest
0 голосов
/ 08 января 2020

У меня есть 4 источника, которые я загружаю в dwh и использую поток воздуха для его организации. Чтобы автоматически сгенерировать этот DAG, я основываюсь на списке функций ETL и использую DummyOperators для указания зависимостей между функциями. Я перебираю для l oop список функций и собираю DAG. Для 3 из 4 источников последний манекен дает мне следующую ошибку:

[2020-01-08 10: 01: 19,818] { init .py: 51} ИНФОРМАЦИЯ - Использование executor LocalExecutor [2020-01-08 10: 01: 19,819] {dagbag.py:92} INFO - Заполнение DagBag из /vmm/scripts/airflow/dags/initwaterdwh.py Traceback (последний вызов был последним): Файл "/ usr / local / bin / airflow", строка 32, в args.fun c (args) Файл "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils /cli.py ", строка 74, в оболочке возвращает файл f (* args, ** kwargs)" /home/airflow/.local/lib/python3.6/site-packages/airflow/bin/cli.py ", строка 538, в файле run task = dag.get_task (task_id = args.task_id)" /home/airflow/.local/lib/python3.6/site-packages/airflow/models/dag.py ", строка 1115 в get_task вызывает AirflowException («Task {task_id} не найден» .format (task_id = task_id)) airflow.exceptions.AirflowException: Task gdb_hub_done не найден

Предыдущие операторы-пустышки в этом же источнике выполнить правильно, как показано в age.

Мой код для создания этой группы обеспечения доступности баз данных выглядит следующим образом:

for x, shortname in enumerate(src_sn):
​
        mtdschema = 'test_' + shortname + '_run_info'
        src = src_bk[x]
​
        previousDummy = DummyOperator(
            task_id=src
        )
​
        updateRunInfo = PostgresOperator(
            task_id='update_loadcycle_' + src,
            sql="",
            postgres_conn_id=conn_id,
            database=targetdb
        )
​
        pdresSource = pdres[pdres["source"] == src]
        layerNrs = sorted(set(pdresSource["layerNr"]))
​
        insertNachtverwerkingStart >> updateRunInfo
​
        updateHistoryStart = PostgresOperator(
            task_id='update_loadhistory_' + src + '_start',
            sql="",
            postgres_conn_id=conn_id,
            database=targetdb
        )
​
        previousDummy << updateHistoryStart
        updateRunInfo >> updateHistoryStart

        for layerNr in layerNrs:
​
            currentLayerName = set(pdresSource[pdresSource["layerNr"] == layerNr]["layerName"]).pop()

            currentDummy = DummyOperator(
                task_id= src + "_" + currentLayerName + "_done"
            )

            for fileName in pdresSource[pdresSource["layerNr"] == layerNr]["reducedPath"]:
                t = PostgresOperator(
                    task_id=fileName,
                    sql='',
                    postgres_conn_id=conn_id,
                    database=targetdb
                )
                t << previousDummy
                t >> currentDummy
​
            previousDummy = currentDummy
            previousLayerNr = layerNr
​
        updateHistoryStopSuccess = PostgresOperator(
            task_id='update_loadhistory_' + src + '_success',
            sql=""
            postgres_conn_id=conn_id,
            database=targetdb
        )
​
        updateHistoryStopFailure = PostgresOperator(
            task_id='update_loadhistory_' + src + '_failure',
            trigger_rule='one_failed',
            sql="",
            postgres_conn_id=conn_id,
            database=targetdb
        )
​
        insertNachtverwerkingStopSuccess = PostgresOperator(
            task_id='insert_nachtverwerking_success',
            sql="",
            postgres_conn_id='',
            database=''
        )
​
        insertNachtverwerkingStopFailure = PostgresOperator(
            task_id='insert_nachtverwerking_failure',
            trigger_rule='one_failed',
            sql='',
            postgres_conn_id='',
            database=''
        )
​
        previousDummy >> updateHistoryStopSuccess
        previousDummy >> updateHistoryStopFailure
​
        updateHistoryStopSuccess >> insertNachtverwerkingStopFailure
        updateHistoryStopSuccess >> insertNachtverwerkingStopSuccess
​
        insertNachtverwerkingStopSuccess >> datastageBV

Я попытался удалить группу обеспечения доступности баз данных и удалить метаданные через поток воздуха GUI и экземпляры задачи. потому что эти пустышки появляются в метаданных. Я немного застрял на том, почему именно это происходит.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...