Для ясности, ваши команды на самом деле установят два подключения к базе данных, но это к двум отдельным базам данных (если вы не пытаетесь подключиться к вашей Postgres базе данных Airflow). Первая строка инициализации ловушки не должна создавать никаких соединений. Только вторая строка сначала получает информацию о соединении из базы данных Airflow (которую, я не думаю, вы можете избежать), а затем использует ее для соединения с базой данных Postgres (что, я думаю, является точкой).
Вы можете сделать немного проще, хотя с помощью:
postgres_hook = PostgresHook(self.postgres_conn_id)
engine = postgres_hook.get_sqlalchemy_engine()
Это кажется довольно чистым, но если вы хотите получить еще более прямой, не проходя через PostgresHook
, вы можете получить его напрямую, запросив базу данных Airflow. Однако это означает, что вы в конечном итоге продублируете код для создания URI из объекта подключения. Лежащая в основе реализация get_connection () является хорошим примером, если вы хотите продолжить это.
from airflow.settings import Session
conn = session.query(Connection).filter(Connection.conn_id == self.postgres_conn_id).one()
... # build uri from connection
create_engine(uri)
Кроме того, если вы хотите иметь возможность доступа к extras
без Отдельная выборка из базы данных помимо того, что get_uri()
или get_sqlalchemy_engine()
делает, вы можете переопределить BaseHook.get_connection () , чтобы сохранить объект соединения в переменной экземпляра для повторного использования. Для этого потребуется создать свой собственный хук поверх PostgresHook
, поэтому я понимаю, что это может быть не идеально.
class CustomPostgresHook(PostgresHook):
@classmethod
def get_connection(cls, conn_id): # type: (str) -> Connection
conn = super().get_connection(conn_id)
self.conn_obj = conn # can't use self.conn because PostgresHook will overriden in https://github.com/apache/airflow/blob/1.10.10/airflow/hooks/postgres_hook.py#L93 by a different type of connection
return conn
postgres_hook = CustomPostgresHook(self.postgres_conn_id)
uri = postgres_hook.get_uri()
# do something with postgres_hook.conn_obj.extras_dejson
Некоторые встроенные хуки для Airflow уже имеют такое поведение (grp c, samba, tableau ), но это определенно не стандартизировано.