Да, на самом деле это именно то, что вы описали.Просто создайте свои операторы в цикле for.Убедитесь, что ваши идентификаторы задач уникальны, и вы настроены:
BQ_TABLE_NAME_CATEGORIES = Variable.get("tables_categories")
BQ_TABLE_NAME_PRODUCTS = Variable.get("tables_products")
list = [BQ_TABLE_NAME_CATEGORIES, BQ_TABLE_NAME_PRODUCTS]
for table in list:
import_op = MySqlToGoogleCloudStorageOperator(
task_id=`import_${table}`,
mysql_conn_id='c_mysql',
google_cloud_storage_conn_id='gcp_a',
approx_max_file_size_bytes = 100000000, #100MB per file
sql = `import_${table}.sql`,
bucket=GCS_BUCKET_ID,
filename=file_name,
dag=dag)
gcs_to_bigquery_op = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id=`load_${table}_to_BigQuery`,
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template,
source_format='NEWLINE_DELIMITED_JSON',
source_objects=[uri_template_read_from],
schema_fields=Categories(),
src_fmt_configs={'ignoreUnknownValues': True},
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
skip_leading_rows = 1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID)
import_op >> gcs_to_bigquery_op
Вы можете упростить это, если храните все таблицы в одной переменной:
// bq_tables = "table_products,table_orders"
BQ_TABLES = Variable.get("bq_tables").split(',')
for table in BQ_TABLES:
...
Редактировать: ссылки на задачи противИдентификаторы
Луис спросил, как нужно менять только идентификаторы задач (а не ссылки на задачи).На самом деле вам даже не нужно ни к чему ссылаться на свои задачи, кроме добавления к ним некоторых деталей после создания (например, зависимостей вверх и вниз по течению), поскольку они сохраняются в объекте DAG при создании, и это все, что анализатор DAGнаходясь в поиске.Как только анализатор DAG находит объект DAG в глобальной области, он использует его.Он не знает, как называются задачи в глобальной области видимости, он знает только, что эти задачи перечислены в объекте DAG и что они перечисляют друг друга в восходящем или нисходящем направлении.
Я бы хотелсделал это комментарием к этому ответу, но я хотел показать следующий код, чтобы объяснить, что я имею в виду, более очевидно (в котором я использую with DAG
, чтобы избежать назначения каждой задачи для dag, и оператора побитового сдвига upstream /нижестоящее назначение, чтобы избежать необходимости ссылаться на задачи по ссылке и отформатированным f-строкам в python3):
// bq_tables = "table_products,table_orders"
BQ_TABLES = Variable.get("bq_tables").split(',')
with DAG('…dag_id…', …) as dag:
for table in BQ_TABLES:
MySqlToGoogleCloudStorageOperator(
task_id=f'import_{table}',
sql=f'import_{table}.sql',
… # all params except notably there's no `dag=dag` in here.
) >> GoogleCloudStorageToBigQueryOperator( # Yup, …
task_id=f'load_{table}_to_BigQuery',
… # again all but `dag=dag` in here.
)
Конечно, это могло быть t1=…; t2=…; t1>>t2; …
, но почему ссылки на имена?