параметр имеет значение None при наследовании от BigQueryOperator в Apache Airflow 1.10.2 - PullRequest
0 голосов
/ 27 июня 2019

Я обновляюсь до Airflow 1.10.2 с 1.9.0, в python 2.7, и у меня проблемы с airflow/contrib/operators/bigquery_operator.py, точнее с устаревшим параметром bql в пользу sql

У меня есть иерархия классов, основанная на BigQueryOperator

BigQueryToPartitionTableOperator -> BigQueryFromExternalSqlOperator -> BigQueryOperator
class BigQueryFromExternalSqlOperator(BigQueryOperator):
    template_fields = BigQueryOperator.template_fields + ('get_sql_kwargs',)

    def __init__(self, get_sql_func, get_sql_kwargs={}, *args, **kwargs):

        super(BigQueryFromExternalSqlOperator, self).__init__(bql='',  #/!\ problematic parameter
                                                              *args,
                                                              **kwargs)
        self.get_sql_func = get_sql_func
        self.get_sql_kwargs = get_sql_kwargs

    def get_sql(self):
        return self.get_sql_func(**self.get_sql_kwargs)

    def pre_execute(self, context):
        self.bql = self.get_sql()


class BigQueryToPartitionTableOperator(BigQueryFromExternalSqlOperator):
    template_fields = ('get_schema_kwargs',) + BigQueryFromExternalSqlOperator.template_fields
    template_ext = ('_.sql',)

    def __init__(self, get_schema_func, get_schema_kwargs={}, *args, **kwargs):

        super(BigQueryToPartitionTableOperator, self).__init__(*args, **kwargs)

        self.hook = BigQueryTableHook(bigquery_conn_id=self.bigquery_conn_id,
                                      delegate_to=self.delegate_to)
        self.get_schema_func = get_schema_func
        self.get_schema_kwargs = get_schema_kwargs
        self.schema = None

Один из моих DAG использует BigQueryToPartitionTableOperator.Когда я делаю airflow list_dags, чтобы получить анализируемые, вот что я получаю

Traceback (most recent call last):
  File "/usr/local/lib/airflow/airflow/models.py", line 374, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/home/airflow/gcs/dags/processing/dags/learning/clustering_activity/dag.py", line 37, in <module>
    "period": Variable.get("activity_clustering.period")
  File "/home/airflow/gcs/dags/processing/common/dags/inference_dag.py", line 215, in __enter__
    dataset_partitioned=self.dataset,
  File "/home/airflow/gcs/dags/processing/common/operators/big_query_operator.py", line 79, in __init__
    super(BigQueryShardedToPartitionedOperator, self).__init__(bql=None, *args, **kwargs)
  File "/usr/local/lib/airflow/airflow/utils/decorators.py", line 97, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py", line 176, in __init__
    'argument: `sql`'.format(self.task_id))
TypeError: inferred_to_partitioned missing 1 required positional argument: `sql`

Когда я проверяю код для BigQueryOpertor , наиболее релевантная часть в __init__ ниже,self.sql проверено

@apply_defaults
def __init__(sql=None, bql=None, ...):
    ...
    self.sql = sql if sql else bql # /!\ how self.sql is set
    ...
    # TODO remove `bql` in Airflow 2.0
        if self.bql:
            import warnings
            warnings.warn('Deprecated parameter `bql` used in Task id: {}. '
                          'Use `sql` parameter instead to pass the sql to be '
                          'executed. `bql` parameter is deprecated and '
                          'will be removed in a future version of '
                          'Airflow.'.format(self.task_id),
                          category=DeprecationWarning)

        if self.sql is None:
            raise TypeError('{} missing 1 required positional '
'argument: `sql`'.format(self.task_id))

, хотя я поставил значение по умолчанию для bql, bql='' в BigQueryFromExternalSqlOperator Я все еще получаю то же исключение, что и выше.

Я не знаю, связано ли это с наследованием и аргументами по умолчанию в python при создании экземпляров объектов.

Или, возможно, декоратор apply_defaults в decorators.py изменяет параметры, переданные функции BigQueryOperator __init__.

РЕДАКТИРОВАТЬ 1: вот как я вызываю оператора

class myDAG(DAG):

...
    def __enter__():
        ...
        # Save the input dataset in version-suffixed table in BQ
        extract_dataset = BigQueryToPartitionTableOperator(task_id='extract_dataset',
                                                           get_sql_func=self.get_sql,
                                                           get_schema_func=self.get_schema,
                                                           get_sql_kwargs=self.get_extract_dataset_sql_kwargs,
                                                           get_schema_kwargs=self.get_extracted_table_schema_kwargs,
                                                           destination_dataset_table='{}.{}'.format(
                                                               self.dataset,
                                                               self.extracted_table),
                                                           write_disposition='WRITE_TRUNCATE',
                                                           use_legacy_sql=False,
                                                           bigquery_conn_id=self.gcp_conn_id)

1 Ответ

1 голос
/ 01 июля 2019

Спасибо за добавление фрагмента.Если я правильно понимаю, вы не передаете аргумент sql о том, что жалуется на сообщение об ошибке TypeError: inferred_to_partitioned missing 1 required positional argument: sql

Попробуйте исправить это так:

  • pass sql атрибут вашего родительского BigQueryOpertor, который не пустой, просто для отладки
class BigQueryFromExternalSqlOperator(BigQueryOperator):
    template_fields = BigQueryOperator.template_fields + ('get_sql_kwargs',)

    def __init__(self, get_sql_func, get_sql_kwargs={}, *args, **kwargs):

        super(BigQueryFromExternalSqlOperator, self).__init__(sql = 'SELECT ....',
                                                              *args,
                                                              **kwargs)
  • , если после этого исчезла ошибка «отсутствует 1 требуемый аргумент позиции: sql», найдите способпередайте ваш запрос в аргумент BigQueryOperator sql или, если вы не хотите делегировать выполнение запроса, переопределите метод, который его выполняет.Но если вам не нужно выполнение BigQueryOperator, избавиться от этого родителя будет проще.
...