Постановка проблемы:
Я пытаюсь использовать BigqueryOperator в потоке воздуха. Цель состоит в том, чтобы читать одни и те же запросы как можно больше раз при динамическом c изменении имен наборов данных ie имена наборов данных будут переданы в качестве параметра.
пример: project.dataset1_layer1.tablename1
, project.dataset2_layer1.tablename1
Ожидается: Я хочу сохранить одну копию SQL, в которой я могу передать имена наборов данных в качестве параметров, которые могут быть заменены для этого конкретного набора данных.
Сообщения об ошибках :
Я пытался передать имя набора данных Dynami c как часть query_params. Но он потерпел неудачу с приведенным ниже сообщением об ошибке.
Запрос был проанализирован как INFO - Executing: [u'SELECT col1, col2 FROM project.@partner_layer1.tablename']
ERROR - BigQuery job failed. Final error was: {u'reason': u'invalidQuery', u'message': u'Query parameters cannot be used in place of table names at [1:37]', u'location': u'query'}. u'CREATE_IF_NEEDED', u'query': u'SELECT col1, col2 FROM project.@partner_layer1.tablename'}, u'jobType': u'QUERY'}}
`
Вещи, которые я пробовал до сих пор
Query Temaplate temp.sql
выглядит следующим образом:
SELECT col1, col2 FROM `project.@partner_layer1.tablename`;
Воздушный поток BigqueryOperator используется следующим образом:
query_template_dict = {
'partner_list' = ['val1', 'val2', 'val3', 'val4']
'google_project': 'project_name',
'queries': {
'layer3': {
'template': 'temp.sql',
'output_dataset': '_layer3',
'output_tbl': 'table_{}'.format(table_date),
'output_tbl_schema': 'temp.txt'
}
},
'applicable_tasks': {
'val1': {
'table_layer3': []
},
'val2': {
'table_layer3': []
},
'val3': {
'table_layer3': []
},
'val4': {
'table_layer3': []
}
}
}
for partner in query_template_dict['partner_list']:
# Loop over applicable report queries for a partner
applicable_tasks = query_template_dict['applicable_tasks'][partner].keys()
for task in applicable_tasks:
destination_tbl = '{}.{}{}.{}'.format(query_template_dict['google_project'], partner,
query_template_dict['queries'][task]['output_dataset'] ,
query_template_dict['queries'][task]['output_tbl'])
}
#Actual destination table structure
#destination_tbl = 'project.partner_layer3.table_20200223'
run_bq_cmd = BigQueryOperator (
task_id =partner + '-' + task,
sql =[query_template_dict['queries'][task]['template']],
destination_dataset_table =destination_tbl,
use_legacy_sql =False,
write_disposition ='WRITE_APPEND',
create_disposition ='CREATE_IF_NEEDED',
allow_large_results =True,
query_params=[
{
"name": "partner",
"parameterType": { "type": "STRING" },
"parameterValue": { "value": partner}
},
{
"name": "batch_date",
"parameterType": { "type": "STRING" },
"parameterValue": { "value": batch_date}
}
],
dag=dag,
Кто-нибудь может мне помочь с этой проблемой? Есть ли в BigQuery ограничение на динамическую передачу имен наборов данных?