Воздушный поток - предопределите переменные и соединения в файле - PullRequest
0 голосов
/ 04 октября 2019

Можно ли заранее определить переменные, соединения и т. Д. В файле, чтобы они загружались при запуске Airflow? Установка их через пользовательский интерфейс не очень удобна с точки зрения развертывания.

Приветствия

Терри

1 Ответ

0 голосов
/ 04 октября 2019

Я рад, что кто-то задал этот вопрос. Фактически, поскольку Airflow полностью предоставляет базовые SQLAlchemy модели конечному пользователю, программные манипуляции (создание, обновление и удаление) всех моделей Airflow, особенно тех, которые используются для предоставления конфигураций , таких какConnection & Variable возможно.

Это может быть не очень очевидно, но природа Airflow с открытым исходным кодом означает, что секретов нет: выпросто надо заглядывать покрепче. В частности, для этих сценариев использования, я всегда находил cli.py быть очень полезной точкой отсчета.


Так вот фрагмент кода я использую, чтобы создать все Соединения MySQL при настройке Airflow. Поставляемый входной файл имеет формат JSON с заданной структурой.

# all imports
import json
from typing import List, Dict, Any, Optional

from airflow.models import Connection
from airflow.settings import Session
from airflow.utils.db import provide_session
from sqlalchemy.orm import exc

# trigger method
def create_mysql_conns(file_path: str) -> None:
    """
    Reads MySQL connection settings from a given JSON file and
    persists it in Airflow's meta-db. If connection for same
    db already exists, it is overwritten

    :param file_path: Path to JSON file containing MySQL connection settings
    :type file_path:  str
    :return:          None
    :type:            None
    """
    with open(file_path) as json_file:
        json_data: List[Dict[str, Any]] = json.load(json_file)
        for settings_dict in json_data:
            db_name: str = settings_dict["db"]
            conn_id: str = "mysql.{db_name}".format(db_name=db_name)
            mysql_conn: Connection = Connection(conn_id=conn_id,
                                                conn_type="mysql",
                                                host=settings_dict["host"],
                                                login=settings_dict["user"],
                                                password=settings_dict["password"],
                                                schema=db_name,
                                                port=settings_dict.get("port", mysql_conn_description["port"]))
            create_and_overwrite_conn(conn=mysql_conn)


# utility delete method
@provide_session
def delete_conn_if_exists(conn_id: str, session: Optional[Session] = None) -> bool:
    # Code snippet borrowed from airflow.bin.cli.connections(..)
    try:
        to_delete: Connection = (session
                                 .query(Connection)
                                 .filter(Connection.conn_id == conn_id)
                                 .one())
    except exc.NoResultFound:
        return False
    except exc.MultipleResultsFound:
        return False
    else:
        session.delete(to_delete)
        session.commit()
        return True


# utility overwrite method
@provide_session
def create_and_overwrite_conn(conn: Connection, session: Optional[Session] = None) -> None:
    delete_conn_if_exists(conn_id=conn.conn_id)
    session.add(conn)
    session.commit()

структура входного файла JSON

[
    {
        "db": "db_1",
        "host": "db_1.hostname.com",
        "user": "db_1_user",
        "password": "db_1_passwd"
    },
    {
        "db": "db_2",
        "host": "db_2.hostname.com",
        "user": "db_2_user",
        "password": "db_2_passwd"
    }
]

Ссылочные ссылки

...