Airflow SystemsManagerParameterStoreBackend не работает - PullRequest
0 голосов
/ 05 мая 2020

У меня есть файл airflow.cfg с секретным разделом, который выглядит так:

[secrets]
backend = airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend
backend_kwargs = {"connections_prefix": "/airflow/connections", "profile_name": "myprofile"}

Затем у меня также есть файл entrypoint.sh, который я использую для установки соединений и запуска команд airflow:

#!/usr/bin/env bash

# User-provided configuration must always be respected.
#
# Therefore, this script must only derives Airflow AIRFLOW__ variables from other variables
# when the user did not provide their own configuration.

TRY_LOOP="20"

# Global defaults and back-compat
: "${AIRFLOW_HOME:="/usr/local/airflow"}"
: "${AIRFLOW__CORE__FERNET_KEY:=${FERNET_KEY:=$(python -c "from cryptography.fernet import Fernet; FERNET_KEY = Fernet.generate_key().decode(); print(FERNET_KEY)")}}"
: "${AIRFLOW__CORE__EXECUTOR:="LocalExecutor"}"

# Load DAGs examples (default: Yes)
if [[ -z "$AIRFLOW__CORE__LOAD_EXAMPLES" && "${LOAD_EX:=n}" == n ]]; then
  AIRFLOW__CORE__LOAD_EXAMPLES=False
fi

export \
  AIRFLOW_HOME \
  AIRFLOW__CORE__EXECUTOR \
  AIRFLOW__CORE__FERNET_KEY \
  AIRFLOW__CORE__LOAD_EXAMPLES \

# Install custom python package if requirements.txt is present
if [ -e "/requirements.txt" ]; then
    $(command -v pip) install --user -r /requirements.txt
fi

wait_for_port() {
  local name="$1" host="$2" port="$3"
  local j=0
  while ! nc -z "$host" "$port" >/dev/null 2>&1 < /dev/null; do
    j=$((j+1))
    if [ $j -ge $TRY_LOOP ]; then
      echo >&2 "$(date) - $host:$port still not reachable, giving up"
      exit 1
    fi
    echo "$(date) - waiting for $name... $j/$TRY_LOOP"
    sleep 5
  done
}
printf 'var printing'
echo $AIRFLOW__CORE__EXECUTOR
# Other executors than SequentialExecutor drive the need for an SQL database, here PostgreSQL is used
if [ "$AIRFLOW__CORE__EXECUTOR" != "SequentialExecutor" ]; then
  printf 'printenv '
  printenv
  exit 1
  # Check if the user has provided explicit Airflow configuration concerning the database
  if [ -z "$AIRFLOW__CORE__SQL_ALCHEMY_CONN" ]; then
    AIRFLOW__CORE__SQL_ALCHEMY_CONN=AIRFLOW_CONN_PROD_RDS
    export AIRFLOW__CORE__SQL_ALCHEMY_CONN

    # Check if the user has provided explicit Airflow configuration for the broker's connection to the database
    if [ "$AIRFLOW__CORE__EXECUTOR" = "CeleryExecutor" ]; then
      AIRFLOW__CELERY__RESULT_BACKEND="db+postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}${POSTGRES_EXTRAS}"
      export AIRFLOW__CELERY__RESULT_BACKEND
    fi
  else
    if [[ "$AIRFLOW__CORE__EXECUTOR" == "CeleryExecutor" && -z "$AIRFLOW__CELERY__RESULT_BACKEND" ]]; then
      >&2 printf '%s\n' "FATAL: if you set AIRFLOW__CORE__SQL_ALCHEMY_CONN manually with CeleryExecutor you must also set AIRFLOW__CELERY__RESULT_BACKEND"
      exit 1
    fi

    # Derive useful variables from the AIRFLOW__ variables provided explicitly by the user
    POSTGRES_ENDPOINT=$(echo -n "$AIRFLOW__CORE__SQL_ALCHEMY_CONN" | cut -d '/' -f3 | sed -e 's,.*@,,')
    POSTGRES_HOST=$(echo -n "$POSTGRES_ENDPOINT" | cut -d ':' -f1)
    POSTGRES_PORT=$(echo -n "$POSTGRES_ENDPOINT" | cut -d ':' -f2)
  fi

  wait_for_port "Postgres" "$POSTGRES_HOST" "$POSTGRES_PORT"
fi

# CeleryExecutor drives the need for a Celery broker, here Redis is used
if [ "$AIRFLOW__CORE__EXECUTOR" = "CeleryExecutor" ]; then
  # Check if the user has provided explicit Airflow configuration concerning the broker
  if [ -z "$AIRFLOW__CELERY__BROKER_URL" ]; then
    # Default values corresponding to the default compose files
    : "${REDIS_PROTO:="redis://"}"
    : "${REDIS_HOST:="redis"}"
    : "${REDIS_PORT:="6379"}"
    : "${REDIS_PASSWORD:=""}"
    : "${REDIS_DBNUM:="1"}"

    # When Redis is secured by basic auth, it does not handle the username part of basic auth, only a token
    if [ -n "$REDIS_PASSWORD" ]; then
      REDIS_PREFIX=":${REDIS_PASSWORD}@"
    else
      REDIS_PREFIX=
    fi

    AIRFLOW__CELERY__BROKER_URL="${REDIS_PROTO}${REDIS_PREFIX}${REDIS_HOST}:${REDIS_PORT}/${REDIS_DBNUM}"
    export AIRFLOW__CELERY__BROKER_URL
  else
    # Derive useful variables from the AIRFLOW__ variables provided explicitly by the user
    REDIS_ENDPOINT=$(echo -n "$AIRFLOW__CELERY__BROKER_URL" | cut -d '/' -f3 | sed -e 's,.*@,,')
    REDIS_HOST=$(echo -n "$POSTGRES_ENDPOINT" | cut -d ':' -f1)
    REDIS_PORT=$(echo -n "$POSTGRES_ENDPOINT" | cut -d ':' -f2)
  fi

  wait_for_port "Redis" "$REDIS_HOST" "$REDIS_PORT"
fi

case "$1" in
  webserver)
    airflow initdb
    if [ "$AIRFLOW__CORE__EXECUTOR" = "LocalExecutor" ] || [ "$AIRFLOW__CORE__EXECUTOR" = "SequentialExecutor" ]; then
      # With the "Local" and "Sequential" executors it should all run in one container.
      airflow scheduler &
    fi
    exec airflow webserver
    ;;
  worker|scheduler)
    # Give the webserver time to run initdb.
    sleep 10
    exec airflow "$@"
    ;;
  flower)
    sleep 10
    exec airflow "$@"
    ;;
  version)
    exec airflow "$@"
    ;;
  *)
    # The command is something like bash, not an airflow subcommand. Just run it in the right environment.
    exec "$@"
    ;;
esac

Как видите, у меня есть инструкция printenv. И он возвращает:

printenv PYTHONUNBUFFERED=1
LC_ALL=en_US.UTF-8
LANG=en_US.UTF-8
HOSTNAME=dsdsasae2w
AIRFLOW_HOME=/usr/local/airflow
SLUGIFY_USES_TEXT_UNIDECODE=yes
PWD=/
HOME=/root
AIRFLOW__CORE__LOAD_EXAMPLES=False
SHLVL=1
LANGUAGE=en_US:en
PYTHONPATH=:/
AIRFLOW__CORE__FERNET_KEY=0ynI7-c_SHqsJSgK7XeFBzquIV9iJkfDL4Y1bkNX5bA=
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
AIRFLOW__CORE__EXECUTOR=LocalExecutor
_=/usr/bin/printenv

Итак, нет никакой переменной, которая соответствует теории бэкенда AWS системного менеджера в воздушном потоке

Тогда проблема в что я не могу прочитать в своей entrypoint.sh и переменной environtment, которая является параметром, хранящимся в AWS System manager, несмотря на настройку файла cfg.

Параметр является secureString Параметр type и имеет имя: /airflow/connections/prod_rds Значения представляют собой допустимый uri, например: postgresql+psycopg2://user:pwd@hostname:port/db.

Соединение работает, потому что оно было опробовано в клиенте базы данных.

Все это работает в docker с этой настройкой.

Dockerfile :

FROM ubuntu
ENV PYTHONUNBUFFERED 1

ARG AIRFLOW_VERSION=1.10.1
ARG AIRFLOW_USER_HOME=/usr/local/airflow

ENV AIRFLOW_HOME=${AIRFLOW_USER_HOME}
ENV PYTHONPATH "${PYTHONPATH}:/"
ENV SLUGIFY_USES_TEXT_UNIDECODE=yes



RUN apt-get update && apt-get install -y python3-pip mysql-server vim
RUN apt-get clean && apt-get update && apt-get install -y locales
RUN locale-gen en_US.UTF-8
ENV LANG en_US.UTF-8
ENV LANGUAGE en_US:en
ENV LC_ALL en_US.UTF-8


RUN set -ex \
    && buildDeps=' \
        freetds-dev \
        libkrb5-dev \
        libsasl2-dev \
        libssl-dev \
        libffi-dev \
        libpq-dev \
        git \
    '&& apt-get update -yqq \
    && apt-get upgrade -yqq \
    && apt-get install -yqq --no-install-recommends \
        $buildDeps \
        freetds-bin \
        build-essential \
        default-libmysqlclient-dev \
        apt-utils \
        curl \
        rsync \
        netcat \
        locales \
    && sed -i 's/^# en_US.UTF-8 UTF-8$/en_US.UTF-8 UTF-8/g' /etc/locale.gen \
    && locale-gen \
    && update-locale LANG=en_US.UTF-8 LC_ALL=en_US.UTF-8 \
    #&& useradd -ms /bin/bash -d ${AIRFLOW_USER_HOME} airflow \
    && pip install -U setuptools wheel\
    && pip install pytz \
    && pip install pyOpenSSL \
    && pip install ndg-httpsclient \
    && pip install pyasn1 \
    && pip install apache-airflow[crypto,postgres,ssh]==${AIRFLOW_VERSION} \
    && pip install 'redis==3.2' \
    && if [ -n "${PYTHON_DEPS}" ]; then pip install ${PYTHON_DEPS}; fi \
    && apt-get purge --auto-remove -yqq $buildDeps \
    && apt-get autoremove -yqq --purge \
    && apt-get clean \
    && rm -rf \
        /var/lib/apt/lists/* \
        /tmp/* \
        /var/tmp/* \
        /usr/share/man \
        /usr/share/doc \
        /usr/share/doc-base

RUN pip install --user psycopg2-binary
RUN pip install -r requirements.pip

ADD ./ ./

RUN chown -R 777 ${AIRFLOW_USER_HOME}
RUN chmod -R 777 /entrypoint.sh


WORKDIR "/"
ENTRYPOINT ["/entrypoint.sh"]
CMD ["webserver"]

Docker -compose.yml

version: '3'
services:
  webserver:
    build:
      context: .
      dockerfile: Dockerfile3
    image: myws
    container_name: my-container
    volumes:
      - ./f1:/f1
      - ./dags:/usr/local/airflow/dags
      - ./logs:/usr/local/airflow/logs
    ports:
      - "8081:8080"
    command: webserver
  scheduler:
    container_name: mysch
    build:
      context: .
      dockerfile: Dockerfile3
    volumes:
      - ./f1:/f1
      - ./dags:/usr/local/airflow/dags
      - ./logs:/usr/local/airflow/logs
    ports:
      - "8793:8793"
    command: scheduler
...