Как использовать объекты DatabaseHook с PythonOperator в Airflow без исчерпания соединений? - PullRequest
1 голос
/ 26 февраля 2020

Я пытаюсь сохранить свои учетные данные базы данных с помощью Airflow Connections и использовать их с PythonOperators. Я заметил, что если я передаю учетные данные в PythonOperator, то регистрируется каждая переменная, включая пароль базы данных. Поэтому я перешел, чтобы передать сам объект соединения PythonOperator, в соответствии с приведенным ниже примером.

Но проблема, с которой я сталкиваюсь сейчас, заключается в том, что воздушный поток создает тонну этих объектов, даже несмотря на то, что этот переход запланирован на ежедневную работу, что часто приводит к проблемам с достижением предела подключения. Как использовать PostgresHook с PythonOperator, не используя тонну соединений для скрипта данных в Airflow?

import sys
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook

try:
    sys.path.append('/path/to/my/awesome/module/')
    from awesome_module import function_1, function_1
except:
    raise ImportError("Couldn't import awesome_module")

postgres_hook_object = PostgresHook("dedicated_bot_account")


with postgres_hook_object.get_conn() as con:
    t1 = PythonOperator(
            task_id = 'function_1',
            python_callable = function_1, 
            dag = dag,
            op_kwargs = {'conn':con}
            )

    t2 = PythonOperator(
            task_id = 'function_2',
            python_callable = function_2,
            dag = dag,
            op_args = [con, service]
            )
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...