Как создать операторов из списка в Airflow? - PullRequest
0 голосов
/ 13 декабря 2018

Мне нужно ежедневно копировать таблицы из MySQL в BigQuery.Мой рабочий процесс:

  1. MySqlToGoogleCloudStorageOperator
  2. GoogleCloudStorageToBigQueryOperator

Это работает для одного процесса (скажем, Categories).

Пример:

BQ_TABLE_NAME_CATEGORIES = Variable.get("tables_categories")
...

import_categories_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_categories',
    mysql_conn_id='c_mysql',
    google_cloud_storage_conn_id='gcp_a',
    approx_max_file_size_bytes = 100000000, #100MB per file
    sql = 'import_categories.sql',
    bucket=GCS_BUCKET_ID,
    filename=file_name_categories,
    dag=dag)

gcs_to_bigquery_categories_op = GoogleCloudStorageToBigQueryOperator(
    dag=dag,
    task_id='load_categories_to_BigQuery',
    bucket=GCS_BUCKET_ID,
    destination_project_dataset_table=table_name_template_categories,
    source_format='NEWLINE_DELIMITED_JSON',
    source_objects=[uri_template_categories_read_from],
    schema_fields=Categories(),
    src_fmt_configs={'ignoreUnknownValues': True},
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    skip_leading_rows = 1,
    google_cloud_storage_conn_id=CONNECTION_ID,
    bigquery_conn_id=CONNECTION_ID)


import_categories_op >> gcs_to_bigquery_categories_op 

Теперь, скажем, я хочу увеличить его и заставить его работать еще с 20 таблицами. Есть ли способ сделать это без написания одного и того же кода 20 раз?Я ищу способ сделать что-то вроде:

BQ_TABLE_NAME_CATEGORIES = Variable.get("tables_categories")
BQ_TABLE_NAME_PRODUCTS = Variable.get("tables_products")
....
BQ_TABLE_NAME_ORDERS = Variable.get("tables_orders")
list = [BQ_TABLE_NAME_CATEGORIES,BQ_TABLE_NAME_PRODUCTS,BQ_TABLE_NAME_PRODUCTS  ]
for item in list:
    GENERATE THE OPERATORS PER TABLE

, чтобы создать import_categories_op, import_products_op, import_orders_op и т. Д.

1 Ответ

0 голосов
/ 13 декабря 2018

Да, на самом деле это именно то, что вы описали.Просто создайте свои операторы в цикле for.Убедитесь, что ваши идентификаторы задач уникальны, и вы настроены:

BQ_TABLE_NAME_CATEGORIES = Variable.get("tables_categories")
BQ_TABLE_NAME_PRODUCTS = Variable.get("tables_products")

list = [BQ_TABLE_NAME_CATEGORIES, BQ_TABLE_NAME_PRODUCTS]

for table in list:
    import_op = MySqlToGoogleCloudStorageOperator(
        task_id=`import_${table}`,
        mysql_conn_id='c_mysql',
        google_cloud_storage_conn_id='gcp_a',
        approx_max_file_size_bytes = 100000000, #100MB per file
        sql = `import_${table}.sql`,
        bucket=GCS_BUCKET_ID,
        filename=file_name,
        dag=dag)
    gcs_to_bigquery_op = GoogleCloudStorageToBigQueryOperator(
        dag=dag,
        task_id=`load_${table}_to_BigQuery`,
        bucket=GCS_BUCKET_ID,
        destination_project_dataset_table=table_name_template,
        source_format='NEWLINE_DELIMITED_JSON',
        source_objects=[uri_template_read_from],
        schema_fields=Categories(),
        src_fmt_configs={'ignoreUnknownValues': True},
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_TRUNCATE',
        skip_leading_rows = 1,
        google_cloud_storage_conn_id=CONNECTION_ID,
        bigquery_conn_id=CONNECTION_ID)


    import_op >> gcs_to_bigquery_op

Вы можете упростить это, если храните все таблицы в одной переменной:

// bq_tables = "table_products,table_orders"
BQ_TABLES = Variable.get("bq_tables").split(',')

for table in BQ_TABLES:
    ...

Редактировать: ссылки на задачи противИдентификаторы

Луис спросил, как нужно менять только идентификаторы задач (а не ссылки на задачи).На самом деле вам даже не нужно ни к чему ссылаться на свои задачи, кроме добавления к ним некоторых деталей после создания (например, зависимостей вверх и вниз по течению), поскольку они сохраняются в объекте DAG при создании, и это все, что анализатор DAGнаходясь в поиске.Как только анализатор DAG находит объект DAG в глобальной области, он использует его.Он не знает, как называются задачи в глобальной области видимости, он знает только, что эти задачи перечислены в объекте DAG и что они перечисляют друг друга в восходящем или нисходящем направлении.

Я бы хотелсделал это комментарием к этому ответу, но я хотел показать следующий код, чтобы объяснить, что я имею в виду, более очевидно (в котором я использую with DAG, чтобы избежать назначения каждой задачи для dag, и оператора побитового сдвига upstream /нижестоящее назначение, чтобы избежать необходимости ссылаться на задачи по ссылке и отформатированным f-строкам в python3):

// bq_tables = "table_products,table_orders"
BQ_TABLES = Variable.get("bq_tables").split(',')

with DAG('…dag_id…', …) as dag:
    for table in BQ_TABLES:
        MySqlToGoogleCloudStorageOperator(
            task_id=f'import_{table}',
            sql=f'import_{table}.sql',
            …  # all params except notably there's no `dag=dag` in here.
        ) >> GoogleCloudStorageToBigQueryOperator(  # Yup, …
            task_id=f'load_{table}_to_BigQuery',
            …  # again all but `dag=dag` in here.
        )

Конечно, это могло быть t1=…; t2=…; t1>>t2; …, но почему ссылки на имена?

...