Нет доступа к Airflow default_args - PullRequest
       104

Нет доступа к Airflow default_args

0 голосов
/ 06 августа 2020

Я новичок в воздушном потоке, может ли кто-нибудь помочь мне с этим, так как я не могу получить доступ к 'db_conn' внутри моего пользовательского оператора, этот аргумент определен в default_args.

**Dag details:**

default_args = {
    'owner': 'airflow',
    'email': ['example@hotmail.com'],
    'db_conn': 'database_connection'
}

dag = DAG(dag_id='my_custom_operator_dag', description='Another tutorial DAG',
          schedule_interval='0 12 * * *',
          start_date=datetime(2020, 8, 6), 
          catchup=False,
          default_args=default_args)

operator_task_start = MyOperator(                                
                                task_id='my_custom_operator', dag=dag
                                )

**Operator details:**
class MyOperator(BaseOperator):

    #@apply_defaults
    def __init__(self,
                 *args, 
                 **kwargs):          
        super(MyOperator, self).__init__(*args,**kwargs)
    def execute(self, context):        
        log.info('owner: %s', self.owner)
        log.info('email: %s', self.email)
        log.info('db_conn: %s', self.db_conn) 
        # Error here, AttributeError: 'MyOperator' object has no attribute 'db_conn

1 Ответ

0 голосов
/ 06 августа 2020

Кажется, вы неправильно поняли default_args. default_args - это просто сокращение (очистка кода / рефакторинг / краткость) для передачи общих (которые имеют одинаковое значение для всех операторов DAG, например owner) аргументов для всех ваших operator s, задав их как по умолчанию и передается самому DAG. Цитирование комментария к строке документации from DAG params

:param default_args: A dictionary of default parameters to be used
        as constructor keyword parameters when initialising operators.
        Note that operators have the same hook, and precede those defined
        here, meaning that if your dict contains `'depends_on_past': True`
        here and `'depends_on_past': False` in the operator's call
        `default_args`, the actual value will be `False`.
:type default_args: dict

Так ясно, что default_args работает, любые key s который вы передаете, должен быть аргумент вашего Operator class es.

Не только это, обратите внимание, что передача недопустимых (несуществующих) аргументов в Operator конструктор (ы) будет штрафуется в Airflow 2.0 (так что лучше не передавать)

'Invalid arguments were passed to {c} (task_id: {t}). '
'Support for passing such arguments will be dropped in '
'future. ..

Надеюсь, к настоящему времени должно быть ясно, что для выполнения этой работы вы должны добавить параметр db_conn в конструктор вашего MyOperator класса

**Operator details:**
class MyOperator(BaseOperator):

    #@apply_defaults
    def __init__(self,
                 db_conn: str,
                 *args, 
                 **kwargs):          
        super(MyOperator, self).__init__(*args,**kwargs)
        self.db_conn: str = db_conn

И пока мы занимаемся этим, позвольте мне предложить вам предложение: для чего-то вроде подключения предпочтительно использовать функцию Connection , предлагаемую Airflow, которая упрощает ваше взаимодействие с внешними службами

  • делает их управляемыми (просмотр / редактирование через пользовательский интерфейс)
  • безопасными (они хранятся зашифрованы в db )
  • поддержка балансировки нагрузки (определите несколько соединений с одним и тем же conn_id, Airflow будет случайным образом распределять вызовы одному из них)

Когда существует более одного соединения с одним и тем же conn_id, get_connection () Метод на BaseHook выберет одно соединение случайным образом. Это может использоваться для обеспечения базовой c балансировки нагрузки и отказоустойчивости при использовании в сочетании с повторными попытками.

Они также используют модель airflow.models.connection.Connection для получения имен хостов и информации аутентификации. Хуки хранят код аутентификации и информацию вне конвейеров, централизованно в базе данных метаданных.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...