Как использовать перехватчики соединения с `KubernetesPodOperator` в качестве переменных среды на Apache Воздушный поток в облаке GCP Composer - PullRequest
2 голосов
/ 16 марта 2020

Я хотел бы использовать соединения, сохраненные в airflow, в задаче, которая использует KubernetesPodOperator.

. При разработке образа я использовал переменные среды для передачи базы данных. информация о соединении до контейнера, но в производственной среде базы данных сохраняются как перехватчики соединений.

Каков наилучший способ извлечь информацию о соединении с базой данных и передать ее в контейнер?

env_vars = {'database_usr': 'xxx', 'database_pas': 'xxx'}
KubernetesPodOperator(
        dag=dag,
        task_id="example-task",
        name="example-task",
        namespace="default",
        image="eu.gcr.io/repo/image:tag",
        image_pull_policy="Always",
        arguments=["-v", "image-command", "image-arg"],
        env_vars=env_vars,
    )

1 Ответ

1 голос
/ 17 марта 2020

Мое текущее решение состоит в том, чтобы получить переменные из соединения, используя BaseHook:

from airflow.hooks.base_hook import BaseHook


def connection_to_dict(connection_id):
    """Returns connection params from Airflow as a dictionary.

    Parameters
    ----------
    connection_id : str
        Name of the connection in Airflow, e.g. `mysql_default`

    Returns
    -------
    dict
        Unencrypted values.
    """
    conn_obj = BaseHook.get_connection(connection_id)
    d = conn_obj.__dict__
    if ('is_encrypted', True) in d.items():
        d['password'] = conn_obj.get_password()
    return d

, а затем передать их в качестве переменных среды оператору pod Kubernetes.

...