Как передать параметр запроса в файл SQL с помощью оператора BigQuery - PullRequest
0 голосов
/ 24 мая 2019

Мне нужен доступ к параметру, переданному BigqueryOperator в файл sql, но я получаю сообщение об ошибке ERROR - queryParameters argument must have a type <class 'dict'> not <class 'list'> Я использую код ниже:

t2 = bigquery_operator.BigQueryOperator(
task_id='bq_from_source_to_clean',
sql='prepare.sql',
use_legacy_sql=False,
allow_large_results=True,
query_params=[{ 'name': 'threshold_date', 'parameterType': { 'type': 'STRING' },'parameterValue': { 'value': '2020-01-01' } }],
destination_dataset_table="{}.{}.{}".format('xxxx',
                                            'xxxx',
                                            'temp_airflow_test'),
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
dag=dag

)

Sql:

select  cast(DATE_ADD(a.dt_2, interval 7 day) as DATE) as dt_1
,a.dt_2
,cast('2010-01-01' as DATE) as dt_3 
from (select cast(@threshold_date as date) as dt_2) a

Я использую Google Composer версии composer-1.7.0-airflow-1.10.2

Заранее спасибо.

1 Ответ

1 голос
/ 24 мая 2019

После погружения в исходный код, похоже, что BigQueryHook исправлена ​​ошибка в Airflow 1.10.3.

То, как вы определили query_params, подходит для более новых версий Airflow и должно быть списком в соответствии с API BigQuery: см. https://cloud.google.com/bigquery/docs/parameterized-queries#bigquery_query_params_named-python.

В любом случае, вы получаете эту ошибку, потому что в Airflow 1.10.2 query_params определяется как dict, см .:

https://github.com/apache/airflow/blob/1.10.2/airflow/contrib/hooks/bigquery_hook.py#L678

query_param_list = [
    ...
    (query_params, 'queryParameters', None, dict),
    ...
]

Это заставляет внутреннюю функцию _validate_value выдавать TypeError:

https://github.com/apache/airflow/blob/1.10.2/airflow/contrib/hooks/bigquery_hook.py#L1954

def _validate_value(key, value, expected_type):
    """ function to check expected type and raise
    error if type is not correct """
    if not isinstance(value, expected_type):
        raise TypeError("{} argument must have a type {} not {}".format(
            key, expected_type, type(value)))

Я не нашел ни одного примера query_params в Airflow 1.10.2 (или каких-либо модульных тестах ...), но я думаю, что это только потому, что он не применим.

Эти ошибки были исправлены этими коммитами:

Эти изменения были встроены в Airflow 1.10.3, но на данный момент Airflow 1.10.3 недоступен в Composer (https://cloud.google.com/composer/docs/concepts/versioning/composer-versions#new_environments): последняя версия была выпущена 16 мая 2019 года и содержит версию 1.10 0,2.

В ожидании этой новой версии я вижу 2 способа решения вашей проблемы:

  • копируйте / вставляйте фиксированные версии BigQueryOperator и BigQueryHook и вставляйте их в свои источники, чтобы использовать их, или расширяйте существующие BigQueryHook и переопределяйте ошибочные методы. Я не уверен, что вы можете исправлять BigQueryHook напрямую (нет доступа к этим файлам в среде Composer)
  • шаблонизируйте свой SQL-запрос самостоятельно (и не используйте query_params)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...