Как автоматизировать конвейер BigQuery SQL - PullRequest
1 голос
/ 21 апреля 2020

Я создал конвейер данных, используя BigQuery SQL. Он начинается с импорта файла CSV из облачного хранилища, затем выполняется другой анализ, включая прогнозное моделирование с использованием расчета BigQueryML Geography с использованием функций Geography и вычисление KPI с использованием функций Analyti c.

Я могу успешно запустить различные запросы вручную, и теперь я хотел бы автоматизировать конвейер данных.

Моим первым выбором был DataFlow SQL, но оказалось, что синтаксис запроса Dataflow SQL не поддерживает функции географии.

DataFlow python является менее подходящим вариантом, поскольку полный анализ выполняется в SQL, и я хотел бы сохранить его таким образом.

Мой вопрос заключается в том, какие другие опции GCP доступны для автоматизации конвейер данных.

Ответы [ 2 ]

1 голос
/ 22 апреля 2020

Как я уже упоминал в комментарии, если вам нужно организовать свои запросы, вы можете использовать Cloud Composer, полностью управляемый Airflow кластер.

Я создал код ниже, чтобы показать вам более или менее, как Не могли бы вы организовать ваши запросы с помощью этого инструмента. Обратите внимание, что это базовый код c, и его можно улучшить с точки зрения стандартов кодирования. Код в основном организовывает 3 запроса:

  1. Первый читает из таблицы publi c и записывает в другую таблицу в вашем проекте
  2. Второй читает таблицу, созданную в вашем Первый запрос и выберите 10000 новых строк в столбце даты. После этого он сохраняет результат в таблицу в вашем проекте.
  3. Третий читает таблицу, созданную на шаге 2, и вычисляет некоторые агрегации. После этого результаты сохраняются в другой таблице вашего проекта.

    import datetime
    from airflow import models
    from airflow.contrib.operators import bigquery_operator
    
    """The condiguration presented below will run your DAG every five minutes as specified in the 
    schedule_interval property starting from the datetime specified in the start_date property"""
    
    default_dag_args = {
        'start_date': datetime.datetime(2020, 4, 22, 15, 40), 
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': datetime.timedelta(minutes=1),
        'project_id': "<your_project_id>",
    }
    
    with models.DAG(
            'composer_airflow_bigquery_orchestration',
            schedule_interval = "*/5 * * * *",
            default_args=default_dag_args) as dag:
    
        run_first_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT * FROM `bigquery-public-data.catalonian_mobile_coverage.mobile_data_2015_2017`",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_1",
            task_id = 'xxxxxxxx',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
        run_second_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT * FROM `<your_project>.orchestration_1` ORDER BY date LIMIT 10000 ",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_2",
            task_id = 'yyyyyyyy',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
        run_third_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT round(lat) r_lat, round(long) r_long, count(1) total FROM`<your_project>.orchestration_2` GROUP BY r_lat,r_long",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_3",
            task_id = 'zzzzzzzz',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
    
       # Define DAG dependencies.
        run_first_query >> run_second_query >> run_third_query
    

Шаг за шагом:

  • Во-первых, он импортировал некоторые библиотеки Airflow, такие как модели и bigquery_operator

    from airflow import models
    from airflow.contrib.operators import bigquery_operator
    
  • Затем он определил диктовку с именем default_dag_args, которая будет использоваться в дальнейшем при создании вашей DAG.

    default_dag_args = {
        'start_date': datetime.datetime(2020, 4, 22, 15, 40), 
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': datetime.timedelta(minutes=1),
        'project_id': "<your_project_id>",
    }
    
  • Когда вы создаете свою группу обеспечения доступности баз данных, вы передаете диктовку default_dag_args в качестве аргумента по умолчанию и добавляете аргумент schedule interval, который будет определять, когда ваша группа обеспечения доступности баз данных должна выполняться. Вы можете использовать этот аргумент с некоторыми предустановленными выражениями или с помощью выражений CRON, как вы можете видеть здесь

    with models.DAG(
            'composer_airflow_bigquery_orchestration',
            schedule_interval = "*/5 * * * *",
            default_args=default_dag_args) as dag:
    
  • После этого вы можете создавать экземпляры вашего оператора. В этом случае мы используем только BigQueryOperator

        run_first_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT * FROM `bigquery-public-data.catalonian_mobile_coverage.mobile_data_2015_2017`",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_1",
            task_id = 'xxxxxxxx',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
        run_second_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT * FROM `<your_project>.orchestration_1` ORDER BY date LIMIT 10000 ",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_2",
            task_id = 'yyyyyyyy',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
        run_third_query = bigquery_operator.BigQueryOperator(
            sql = "SELECT round(lat) r_lat, round(long) r_long, count(1) total FROM`<your_project>.orchestration_2` GROUP BY r_lat,r_long",
            destination_dataset_table = "<your_project>.<your_dataset>.orchestration_3",
            task_id = 'zzzzzzzz',
            write_disposition = "WRITE_TRUNCATE",
            #create_disposition = "",
            allow_large_results = True,
            use_legacy_sql = False
        )
    
  • В качестве последнего шага мы можем определить зависимости для группы доступности базы данных. Этот фрагмент кода означает, что операция run_second_query зависит от завершения run_first_query и поэтому выполняется.

        run_first_query >> run_second_query >> run_third_query
    

Наконец, я хотел бы добавить эту статью которые обсуждают, как правильно установить start_date и schedule_interval при использовании выражений CRON.

0 голосов
/ 22 апреля 2020

BigQuery имеет встроенный механизм планирования, который в настоящее время находится в стадии бета-тестирования.

Для автоматизации собственного конвейера BQ SQL вы можете использовать эту утилиту. Использование CLI:

$ bq query \
--use_legacy_sql=false \
--destination_table=mydataset.mytable \
--display_name='My Scheduled Query' \
--replace=true \
'SELECT
1
FROM
mydataset.test'
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...