После погружения в исходный код, похоже, что BigQueryHook
исправлена ошибка в Airflow 1.10.3.
То, как вы определили query_params
, подходит для более новых версий Airflow и должно быть списком в соответствии с API BigQuery: см. https://cloud.google.com/bigquery/docs/parameterized-queries#bigquery_query_params_named-python.
В любом случае, вы получаете эту ошибку, потому что в Airflow 1.10.2 query_params
определяется как dict
, см .:
https://github.com/apache/airflow/blob/1.10.2/airflow/contrib/hooks/bigquery_hook.py#L678
query_param_list = [
...
(query_params, 'queryParameters', None, dict),
...
]
Это заставляет внутреннюю функцию _validate_value
выдавать TypeError
:
https://github.com/apache/airflow/blob/1.10.2/airflow/contrib/hooks/bigquery_hook.py#L1954
def _validate_value(key, value, expected_type):
""" function to check expected type and raise
error if type is not correct """
if not isinstance(value, expected_type):
raise TypeError("{} argument must have a type {} not {}".format(
key, expected_type, type(value)))
Я не нашел ни одного примера query_params
в Airflow 1.10.2 (или каких-либо модульных тестах ...), но я думаю, что это только потому, что он не применим.
Эти ошибки были исправлены этими коммитами:
Эти изменения были встроены в Airflow 1.10.3, но на данный момент Airflow 1.10.3 недоступен в Composer (https://cloud.google.com/composer/docs/concepts/versioning/composer-versions#new_environments): последняя версия была выпущена 16 мая 2019 года и содержит версию 1.10 0,2.
В ожидании этой новой версии я вижу 2 способа решения вашей проблемы:
- копируйте / вставляйте фиксированные версии
BigQueryOperator
и BigQueryHook
и вставляйте их в свои источники, чтобы использовать их, или расширяйте существующие BigQueryHook
и переопределяйте ошибочные методы. Я не уверен, что вы можете исправлять BigQueryHook
напрямую (нет доступа к этим файлам в среде Composer)
- шаблонизируйте свой SQL-запрос самостоятельно (и не используйте
query_params
)