Есть ли способ использовать имя набора данных Dynami c в BigQuery - PullRequest
0 голосов
/ 25 февраля 2020

Постановка проблемы:

Я пытаюсь использовать BigqueryOperator в потоке воздуха. Цель состоит в том, чтобы читать одни и те же запросы как можно больше раз при динамическом c изменении имен наборов данных ie имена наборов данных будут переданы в качестве параметра.

пример: project.dataset1_layer1.tablename1, project.dataset2_layer1.tablename1

Ожидается: Я хочу сохранить одну копию SQL, в которой я могу передать имена наборов данных в качестве параметров, которые могут быть заменены для этого конкретного набора данных.

Сообщения об ошибках :

Я пытался передать имя набора данных Dynami c как часть query_params. Но он потерпел неудачу с приведенным ниже сообщением об ошибке.

Запрос был проанализирован как INFO - Executing: [u'SELECT col1, col2 FROM project.@partner_layer1.tablename']

ERROR - BigQuery job failed. Final error was: {u'reason': u'invalidQuery', u'message': u'Query parameters cannot be used in place of table names at [1:37]', u'location': u'query'}. u'CREATE_IF_NEEDED', u'query': u'SELECT col1, col2 FROM project.@partner_layer1.tablename'}, u'jobType': u'QUERY'}} `

Вещи, которые я пробовал до сих пор

Query Temaplate temp.sql выглядит следующим образом:

SELECT col1, col2 FROM `project.@partner_layer1.tablename`;

Воздушный поток BigqueryOperator используется следующим образом:

query_template_dict = {
    'partner_list' = ['val1', 'val2', 'val3', 'val4']
    'google_project': 'project_name',
    'queries': {
        'layer3': {
            'template':             'temp.sql',
            'output_dataset':       '_layer3',
            'output_tbl':           'table_{}'.format(table_date),
            'output_tbl_schema':    'temp.txt'
        }
    },
    'applicable_tasks': {
        'val1': {
            'table_layer3': []
        },
        'val2': {
            'table_layer3': []
        },
        'val3': {
            'table_layer3': []
        },
        'val4': {
            'table_layer3': []
        }

    }
}


for partner in query_template_dict['partner_list']:
    # Loop over applicable report queries for a partner
    applicable_tasks = query_template_dict['applicable_tasks'][partner].keys()
    for task in applicable_tasks:
        destination_tbl = '{}.{}{}.{}'.format(query_template_dict['google_project'], partner,
                                              query_template_dict['queries'][task]['output_dataset'] , 
                                              query_template_dict['queries'][task]['output_tbl'])
                                              }
        #Actual destination table structure
        #destination_tbl = 'project.partner_layer3.table_20200223'  
run_bq_cmd = BigQueryOperator (
                        task_id                                 =partner + '-' + task,
                        sql                                     =[query_template_dict['queries'][task]['template']],
                        destination_dataset_table               =destination_tbl,
                        use_legacy_sql                          =False,
                        write_disposition                       ='WRITE_APPEND',
                        create_disposition                      ='CREATE_IF_NEEDED',
                        allow_large_results                     =True,
                        query_params=[
                                {
                                        "name":                 "partner",
                                        "parameterType":        { "type": "STRING" },
                                        "parameterValue":       { "value": partner}
                                },

                                {
                                         "name":             "batch_date",
                                         "parameterType":    { "type": "STRING" },
                                         "parameterValue":   { "value": batch_date}
                                }
                        ],
                        dag=dag,

Кто-нибудь может мне помочь с этой проблемой? Есть ли в BigQuery ограничение на динамическую передачу имен наборов данных?

1 Ответ

0 голосов
/ 27 февраля 2020

Замените имя набора данных в Airflow, а не в BigQuery.

Сделайте это до отправки запроса в BigQuery - используйте Python замена строки в Airflow.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...