Движок SQLAlchemy из ловушки базы данных Airflow - PullRequest
1 голос
/ 30 апреля 2020

Какой лучший способ получить движок SQLAlchemy из идентификатора соединения Airflow?

В настоящее время я создаю ловушку, извлекаю ее URI, а затем использую его для создания движка SQLAlchemy.

postgres_hook = PostgresHook(self.postgres_conn_id)
engine = create_engine(postgres_hook.get_uri())

Это работает, но обе команды устанавливают соединение с базой данных.

Когда у меня есть «дополнительные» параметры соединения, для их получения требуется третье соединение (см. Извлечение полного URI соединения из Airflow Postgres hook )

Есть ли более короткий и более прямой метод?

1 Ответ

2 голосов
/ 02 мая 2020

Для ясности, ваши команды на самом деле установят два подключения к базе данных, но это к двум отдельным базам данных (если вы не пытаетесь подключиться к вашей Postgres базе данных Airflow). Первая строка инициализации ловушки не должна создавать никаких соединений. Только вторая строка сначала получает информацию о соединении из базы данных Airflow (которую, я не думаю, вы можете избежать), а затем использует ее для соединения с базой данных Postgres (что, я думаю, является точкой).

Вы можете сделать немного проще, хотя с помощью:

postgres_hook = PostgresHook(self.postgres_conn_id)
engine = postgres_hook.get_sqlalchemy_engine()

Это кажется довольно чистым, но если вы хотите получить еще более прямой, не проходя через PostgresHook, вы можете получить его напрямую, запросив базу данных Airflow. Однако это означает, что вы в конечном итоге продублируете код для создания URI из объекта подключения. Лежащая в основе реализация get_connection () является хорошим примером, если вы хотите продолжить это.

from airflow.settings import Session

conn = session.query(Connection).filter(Connection.conn_id == self.postgres_conn_id).one()
... # build uri from connection
create_engine(uri)

Кроме того, если вы хотите иметь возможность доступа к extras без Отдельная выборка из базы данных помимо того, что get_uri() или get_sqlalchemy_engine() делает, вы можете переопределить BaseHook.get_connection () , чтобы сохранить объект соединения в переменной экземпляра для повторного использования. Для этого потребуется создать свой собственный хук поверх PostgresHook, поэтому я понимаю, что это может быть не идеально.

class CustomPostgresHook(PostgresHook):

    @classmethod
    def get_connection(cls, conn_id):  # type: (str) -> Connection
        conn = super().get_connection(conn_id)
        self.conn_obj = conn  # can't use self.conn because PostgresHook will overriden in https://github.com/apache/airflow/blob/1.10.10/airflow/hooks/postgres_hook.py#L93 by a different type of connection
        return conn

postgres_hook = CustomPostgresHook(self.postgres_conn_id)
uri = postgres_hook.get_uri()
# do something with postgres_hook.conn_obj.extras_dejson

Некоторые встроенные хуки для Airflow уже имеют такое поведение (grp c, samba, tableau ), но это определенно не стандартизировано.

...