Кажется, вы неправильно поняли default_args
. default_args
- это просто сокращение (очистка кода / рефакторинг / краткость) для передачи общих (которые имеют одинаковое значение для всех операторов DAG, например owner
) аргументов для всех ваших operator
s, задав их как по умолчанию и передается самому DAG
. Цитирование комментария к строке документации from DAG
params
:param default_args: A dictionary of default parameters to be used
as constructor keyword parameters when initialising operators.
Note that operators have the same hook, and precede those defined
here, meaning that if your dict contains `'depends_on_past': True`
here and `'depends_on_past': False` in the operator's call
`default_args`, the actual value will be `False`.
:type default_args: dict
Так ясно, что default_args
работает, любые key
s который вы передаете, должен быть аргумент вашего Operator
class
es.
Не только это, обратите внимание, что передача недопустимых (несуществующих) аргументов в Operator
конструктор (ы) будет штрафуется в Airflow 2.0 (так что лучше не передавать)
'Invalid arguments were passed to {c} (task_id: {t}). '
'Support for passing such arguments will be dropped in '
'future. ..
Надеюсь, к настоящему времени должно быть ясно, что для выполнения этой работы вы должны добавить параметр db_conn
в конструктор вашего MyOperator
класса
**Operator details:**
class MyOperator(BaseOperator):
#@apply_defaults
def __init__(self,
db_conn: str,
*args,
**kwargs):
super(MyOperator, self).__init__(*args,**kwargs)
self.db_conn: str = db_conn
И пока мы занимаемся этим, позвольте мне предложить вам предложение: для чего-то вроде подключения предпочтительно использовать функцию Connection
, предлагаемую Airflow, которая упрощает ваше взаимодействие с внешними службами
- делает их управляемыми (просмотр / редактирование через пользовательский интерфейс)
- безопасными (они хранятся зашифрованы в db )
- поддержка балансировки нагрузки (определите несколько соединений с одним и тем же
conn_id
, Airflow будет случайным образом распределять вызовы одному из них)
Когда существует более одного соединения с одним и тем же conn_id, get_connection () Метод на BaseHook выберет одно соединение случайным образом. Это может использоваться для обеспечения базовой c балансировки нагрузки и отказоустойчивости при использовании в сочетании с повторными попытками.
Они также используют модель airflow.models.connection.Connection для получения имен хостов и информации аутентификации. Хуки хранят код аутентификации и информацию вне конвейеров, централизованно в базе данных метаданных.