Есть несколько способов сделать это в зависимости от настроек:
Вы можете использовать подход типа DagFactory, когда у вас есть функция, генерирующая группы доступности баз данных.Вы можете найти пример того, как это выглядит здесь
Вы можете сохранить конфигурацию JSON как переменную воздушного потока и проанализировать еесоздать DAG.Вы можете сохранить что-то вроде этого в Admin -> Переменные:
[{
"table": "users",
"schema":"app_one",
"s3_bucket":"etl_bucket",
"s3_key":"app_one_users",
"redshift_conn_id":"postgres_default" },
{
"table": "users",
"schema":"app_two",
"s3_bucket":"etl_bucket",
"s3_key":"app_two_users",
"redshift_conn_id":"postgres_default"}]
Ваш DAG может быть сгенерирован как:
sync_config = json.loads(Variable.get("sync_config"))
with dag:
start = DummyOperator(task_id='begin_dag')
for table in sync_config:
d1 = RedshiftToS3Transfer(
task_id='{0}'.format(table['s3_key']),
table=table['table'],
schema=table['schema'],
s3_bucket=table['s3_bucket'],
s3_key=table['s3_key'],
redshift_conn_id=table['redshift_conn_id']
)
start >> d1
Точно так же вы можете просто сохранить эту конфигурацию как локальный файл и открыть его, как любой другой файл.Имейте в виду, что лучший ответ на этот вопрос будет зависеть от вашей инфраструктуры и варианта использования.