Задача воздушного потока зависит от другого результата задачи? - PullRequest
0 голосов
/ 21 февраля 2019

У меня есть 2 задачи

первая задача загружает несколько наборов данных в folder_1

вторая задача очистки каждого файла на folder_1

CRUDE_NEW_DATASET_LOCAL являетсялокальный путь

download_crude_new_dataset = BashOperator(
        task_id = "download_crude_new_dataset",
        bash_command = bash.download_crude_new_dataset(),
        dag=dag
)

cleaning_crude_new_dataset = []
crude_new_dataset = glob(bash.CRUDE_NEW_DATASET_LOCAL+"/*",recursive=True)
for p in crude_new_dataset :
    path = p.replace('\\','/')
    if os.path.isfile(path):
        cleaning_crude_new_dataset.append(
            BashOperator(
                task_id = "cleaning_crude_new_dataset-"+bash._path_leaf_(path),
                bash_command = bash.cleaning_dataset(path),
                dag=dag
            )
    ) 

download_crude_new_dataset >> cleaning_crude_new_dataset

проблема, пока я запускаю метку потока воздуха, folder_1 все еще пуст.и это делает cleaning_crude_new_dataset (массив задач) пустыми.

спасибо за помощь

Ответы [ 2 ]

0 голосов
/ 04 марта 2019

эта проблема была решена

разбить задачу от одного Дага на несколько Дагов и вызвать другие Даги (dag1 >> dag2 >> dag3 ...)

при необходимости измените dag_dir_list_interval с 300 на небольшое число

0 голосов
/ 21 февраля 2019

Попробуйте так:

download_crude_new_dataset = BashOperator(
        task_id= "download_crude_new_dataset",
        bash_command= bash.download_crude_new_dataset(),
        dag= dag
)

#cleaning_crude_new_dataset = []

crude_new_dataset = glob( bash.CRUDE_NEW_DATASET_LOCAL + "/*", recursive= True )
for p in crude_new_dataset :
    path = p.replace( '\\', '/' )
    if os.path.isfile( path ):
        temp_task = BashOperator(
            task_id= "cleaning_crude_new_dataset-" + bash._path_leaf_( path ),
            bash_command= bash.cleaning_dataset( path ),
            dag= dag
        )
        #cleaning_crude_new_dataset.append( temp_task )
        download_crude_new_dataset.set_downstream( temp_task )
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...