Я обновляюсь до Airflow 1.10.2 с 1.9.0, в python 2.7, и у меня проблемы с airflow/contrib/operators/bigquery_operator.py
, точнее с устаревшим параметром bql
в пользу sql
У меня есть иерархия классов, основанная на BigQueryOperator
BigQueryToPartitionTableOperator -> BigQueryFromExternalSqlOperator -> BigQueryOperator
class BigQueryFromExternalSqlOperator(BigQueryOperator):
template_fields = BigQueryOperator.template_fields + ('get_sql_kwargs',)
def __init__(self, get_sql_func, get_sql_kwargs={}, *args, **kwargs):
super(BigQueryFromExternalSqlOperator, self).__init__(bql='', #/!\ problematic parameter
*args,
**kwargs)
self.get_sql_func = get_sql_func
self.get_sql_kwargs = get_sql_kwargs
def get_sql(self):
return self.get_sql_func(**self.get_sql_kwargs)
def pre_execute(self, context):
self.bql = self.get_sql()
class BigQueryToPartitionTableOperator(BigQueryFromExternalSqlOperator):
template_fields = ('get_schema_kwargs',) + BigQueryFromExternalSqlOperator.template_fields
template_ext = ('_.sql',)
def __init__(self, get_schema_func, get_schema_kwargs={}, *args, **kwargs):
super(BigQueryToPartitionTableOperator, self).__init__(*args, **kwargs)
self.hook = BigQueryTableHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
self.get_schema_func = get_schema_func
self.get_schema_kwargs = get_schema_kwargs
self.schema = None
Один из моих DAG использует BigQueryToPartitionTableOperator
.Когда я делаю airflow list_dags
, чтобы получить анализируемые, вот что я получаю
Traceback (most recent call last):
File "/usr/local/lib/airflow/airflow/models.py", line 374, in process_file
m = imp.load_source(mod_name, filepath)
File "/home/airflow/gcs/dags/processing/dags/learning/clustering_activity/dag.py", line 37, in <module>
"period": Variable.get("activity_clustering.period")
File "/home/airflow/gcs/dags/processing/common/dags/inference_dag.py", line 215, in __enter__
dataset_partitioned=self.dataset,
File "/home/airflow/gcs/dags/processing/common/operators/big_query_operator.py", line 79, in __init__
super(BigQueryShardedToPartitionedOperator, self).__init__(bql=None, *args, **kwargs)
File "/usr/local/lib/airflow/airflow/utils/decorators.py", line 97, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py", line 176, in __init__
'argument: `sql`'.format(self.task_id))
TypeError: inferred_to_partitioned missing 1 required positional argument: `sql`
Когда я проверяю код для BigQueryOpertor , наиболее релевантная часть в __init__
ниже,self.sql
проверено
@apply_defaults
def __init__(sql=None, bql=None, ...):
...
self.sql = sql if sql else bql # /!\ how self.sql is set
...
# TODO remove `bql` in Airflow 2.0
if self.bql:
import warnings
warnings.warn('Deprecated parameter `bql` used in Task id: {}. '
'Use `sql` parameter instead to pass the sql to be '
'executed. `bql` parameter is deprecated and '
'will be removed in a future version of '
'Airflow.'.format(self.task_id),
category=DeprecationWarning)
if self.sql is None:
raise TypeError('{} missing 1 required positional '
'argument: `sql`'.format(self.task_id))
, хотя я поставил значение по умолчанию для bql
, bql=''
в BigQueryFromExternalSqlOperator
Я все еще получаю то же исключение, что и выше.
Я не знаю, связано ли это с наследованием и аргументами по умолчанию в python при создании экземпляров объектов.
Или, возможно, декоратор apply_defaults
в decorators.py изменяет параметры, переданные функции BigQueryOperator
__init__
.
РЕДАКТИРОВАТЬ 1: вот как я вызываю оператора
class myDAG(DAG):
...
def __enter__():
...
# Save the input dataset in version-suffixed table in BQ
extract_dataset = BigQueryToPartitionTableOperator(task_id='extract_dataset',
get_sql_func=self.get_sql,
get_schema_func=self.get_schema,
get_sql_kwargs=self.get_extract_dataset_sql_kwargs,
get_schema_kwargs=self.get_extracted_table_schema_kwargs,
destination_dataset_table='{}.{}'.format(
self.dataset,
self.extracted_table),
write_disposition='WRITE_TRUNCATE',
use_legacy_sql=False,
bigquery_conn_id=self.gcp_conn_id)