Ниже приведен рекомендуемый способ создания динамического c DAG или sub-DAG в потоке воздуха, хотя есть и другие способы, но я думаю, что это будет в значительной степени применимо к вашей проблеме.
Во-первых, создайте файл (yaml/csv)
, который включает в себя список всех файлов и местоположений s3
, в вашем случае вы написали функцию для сохранения их в списке, я бы сказал, сохраните их в отдельном файле yaml
и загрузите его при запуске время в воздушном потоке env, а затем создать DAG.
Ниже приведен пример файла yaml
: dynamicDagConfigFile.yaml
job: dynamic-dag
bucket_name: 'bucket-name'
prefix: 'bucket-prefix'
S3Files:
- File1: 'S3Loc1'
- File2: 'S3Loc2'
- File3: 'S3Loc3'
Вы можете изменить функцию Check_For_Files
, чтобы сохранить их в yaml
file.
Теперь мы можем перейти к динамическому созданию c dag:
Сначала определим две задачи, используя фиктивные операторы: начало и конец задачи. Вот такие задачи, в которых мы собираемся опираться на DAG
, динамически создавая задачи между ними:
start = DummyOperator(
task_id='start',
dag=dag
)
end = DummyOperator(
task_id='end',
dag=dag)
Dynami c DAG: мы будем использовать PythonOperators
в потоке воздуха. Функция должна получить в качестве аргументов идентификатор задачи; python функция, которая должна быть выполнена, то есть python_callable для оператора Python; и набор аргументов, которые будут использоваться во время выполнения.
Включите аргумент task id
. Таким образом, мы можем обмениваться данными между задачами, сгенерированными динамическим c способом, например, с помощью XCOM
.
. Вы можете указать свою операционную функцию в этом динамическом c dag, например, s3_to_gcs_op
.
def createDynamicDAG(task_id, callableFunction, args):
task = PythonOperator(
task_id = task_id,
provide_context=True,
#Eval is used since the callableFunction var is of type string
#while the python_callable argument for PythonOperators only receives objects of type callable not strings.
python_callable = eval(callableFunction),
op_kwargs = args,
xcom_push = True,
dag = dag,
)
return task
Наконец, на основе местоположения, присутствующего в файле yaml, вы можете создать динамические c дагс, сначала прочитайте файл yaml
, как показано ниже, и создайте динамические c dag:
with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
# use safe_load instead to load the YAML file
configFile = yaml.safe_load(f)
#Extract file list
S3Files = configFile['S3Files']
#In this loop tasks are created for each table defined in the YAML file
for S3File in S3Files:
for S3File, fieldName in S3File.items():
#Remember task id is provided in order to exchange data among tasks generated in dynamic way.
get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File),
'getS3Data',
{}) #your configs here.
#Second step is upload S3 to GCS
upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})
#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.
Окончательное определение DAG:
Идея состоит в том, что
#once tasks are generated they should linked with the
#dummy operators generated in the start and end tasks.
start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end
Полный код воздушного потока в порядке:
import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
start = DummyOperator(
task_id='start',
dag=dag
)
def createDynamicDAG(task_id, callableFunction, args):
task = PythonOperator(
task_id = task_id,
provide_context=True,
#Eval is used since the callableFunction var is of type string
#while the python_callable argument for PythonOperators only receives objects of type callable not strings.
python_callable = eval(callableFunction),
op_kwargs = args,
xcom_push = True,
dag = dag,
)
return task
end = DummyOperator(
task_id='end',
dag=dag)
with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
configFile = yaml.safe_load(f)
#Extract file list
S3Files = configFile['S3Files']
#In this loop tasks are created for each table defined in the YAML file
for S3File in S3Files:
for S3File, fieldName in S3File.items():
#Remember task id is provided in order to exchange data among tasks generated in dynamic way.
get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File),
'getS3Data',
{}) #your configs here.
#Second step is upload S3 to GCS
upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})
#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.
start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end