У меня есть файл airflow.cfg
с секретным разделом, который выглядит так:
[secrets]
backend = airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend
backend_kwargs = {"connections_prefix": "/airflow/connections", "profile_name": "myprofile"}
Затем у меня также есть файл entrypoint.sh
, который я использую для установки соединений и запуска команд airflow
:
#!/usr/bin/env bash
# User-provided configuration must always be respected.
#
# Therefore, this script must only derives Airflow AIRFLOW__ variables from other variables
# when the user did not provide their own configuration.
TRY_LOOP="20"
# Global defaults and back-compat
: "${AIRFLOW_HOME:="/usr/local/airflow"}"
: "${AIRFLOW__CORE__FERNET_KEY:=${FERNET_KEY:=$(python -c "from cryptography.fernet import Fernet; FERNET_KEY = Fernet.generate_key().decode(); print(FERNET_KEY)")}}"
: "${AIRFLOW__CORE__EXECUTOR:="LocalExecutor"}"
# Load DAGs examples (default: Yes)
if [[ -z "$AIRFLOW__CORE__LOAD_EXAMPLES" && "${LOAD_EX:=n}" == n ]]; then
AIRFLOW__CORE__LOAD_EXAMPLES=False
fi
export \
AIRFLOW_HOME \
AIRFLOW__CORE__EXECUTOR \
AIRFLOW__CORE__FERNET_KEY \
AIRFLOW__CORE__LOAD_EXAMPLES \
# Install custom python package if requirements.txt is present
if [ -e "/requirements.txt" ]; then
$(command -v pip) install --user -r /requirements.txt
fi
wait_for_port() {
local name="$1" host="$2" port="$3"
local j=0
while ! nc -z "$host" "$port" >/dev/null 2>&1 < /dev/null; do
j=$((j+1))
if [ $j -ge $TRY_LOOP ]; then
echo >&2 "$(date) - $host:$port still not reachable, giving up"
exit 1
fi
echo "$(date) - waiting for $name... $j/$TRY_LOOP"
sleep 5
done
}
printf 'var printing'
echo $AIRFLOW__CORE__EXECUTOR
# Other executors than SequentialExecutor drive the need for an SQL database, here PostgreSQL is used
if [ "$AIRFLOW__CORE__EXECUTOR" != "SequentialExecutor" ]; then
printf 'printenv '
printenv
exit 1
# Check if the user has provided explicit Airflow configuration concerning the database
if [ -z "$AIRFLOW__CORE__SQL_ALCHEMY_CONN" ]; then
AIRFLOW__CORE__SQL_ALCHEMY_CONN=AIRFLOW_CONN_PROD_RDS
export AIRFLOW__CORE__SQL_ALCHEMY_CONN
# Check if the user has provided explicit Airflow configuration for the broker's connection to the database
if [ "$AIRFLOW__CORE__EXECUTOR" = "CeleryExecutor" ]; then
AIRFLOW__CELERY__RESULT_BACKEND="db+postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}${POSTGRES_EXTRAS}"
export AIRFLOW__CELERY__RESULT_BACKEND
fi
else
if [[ "$AIRFLOW__CORE__EXECUTOR" == "CeleryExecutor" && -z "$AIRFLOW__CELERY__RESULT_BACKEND" ]]; then
>&2 printf '%s\n' "FATAL: if you set AIRFLOW__CORE__SQL_ALCHEMY_CONN manually with CeleryExecutor you must also set AIRFLOW__CELERY__RESULT_BACKEND"
exit 1
fi
# Derive useful variables from the AIRFLOW__ variables provided explicitly by the user
POSTGRES_ENDPOINT=$(echo -n "$AIRFLOW__CORE__SQL_ALCHEMY_CONN" | cut -d '/' -f3 | sed -e 's,.*@,,')
POSTGRES_HOST=$(echo -n "$POSTGRES_ENDPOINT" | cut -d ':' -f1)
POSTGRES_PORT=$(echo -n "$POSTGRES_ENDPOINT" | cut -d ':' -f2)
fi
wait_for_port "Postgres" "$POSTGRES_HOST" "$POSTGRES_PORT"
fi
# CeleryExecutor drives the need for a Celery broker, here Redis is used
if [ "$AIRFLOW__CORE__EXECUTOR" = "CeleryExecutor" ]; then
# Check if the user has provided explicit Airflow configuration concerning the broker
if [ -z "$AIRFLOW__CELERY__BROKER_URL" ]; then
# Default values corresponding to the default compose files
: "${REDIS_PROTO:="redis://"}"
: "${REDIS_HOST:="redis"}"
: "${REDIS_PORT:="6379"}"
: "${REDIS_PASSWORD:=""}"
: "${REDIS_DBNUM:="1"}"
# When Redis is secured by basic auth, it does not handle the username part of basic auth, only a token
if [ -n "$REDIS_PASSWORD" ]; then
REDIS_PREFIX=":${REDIS_PASSWORD}@"
else
REDIS_PREFIX=
fi
AIRFLOW__CELERY__BROKER_URL="${REDIS_PROTO}${REDIS_PREFIX}${REDIS_HOST}:${REDIS_PORT}/${REDIS_DBNUM}"
export AIRFLOW__CELERY__BROKER_URL
else
# Derive useful variables from the AIRFLOW__ variables provided explicitly by the user
REDIS_ENDPOINT=$(echo -n "$AIRFLOW__CELERY__BROKER_URL" | cut -d '/' -f3 | sed -e 's,.*@,,')
REDIS_HOST=$(echo -n "$POSTGRES_ENDPOINT" | cut -d ':' -f1)
REDIS_PORT=$(echo -n "$POSTGRES_ENDPOINT" | cut -d ':' -f2)
fi
wait_for_port "Redis" "$REDIS_HOST" "$REDIS_PORT"
fi
case "$1" in
webserver)
airflow initdb
if [ "$AIRFLOW__CORE__EXECUTOR" = "LocalExecutor" ] || [ "$AIRFLOW__CORE__EXECUTOR" = "SequentialExecutor" ]; then
# With the "Local" and "Sequential" executors it should all run in one container.
airflow scheduler &
fi
exec airflow webserver
;;
worker|scheduler)
# Give the webserver time to run initdb.
sleep 10
exec airflow "$@"
;;
flower)
sleep 10
exec airflow "$@"
;;
version)
exec airflow "$@"
;;
*)
# The command is something like bash, not an airflow subcommand. Just run it in the right environment.
exec "$@"
;;
esac
Как видите, у меня есть инструкция printenv
. И он возвращает:
printenv PYTHONUNBUFFERED=1
LC_ALL=en_US.UTF-8
LANG=en_US.UTF-8
HOSTNAME=dsdsasae2w
AIRFLOW_HOME=/usr/local/airflow
SLUGIFY_USES_TEXT_UNIDECODE=yes
PWD=/
HOME=/root
AIRFLOW__CORE__LOAD_EXAMPLES=False
SHLVL=1
LANGUAGE=en_US:en
PYTHONPATH=:/
AIRFLOW__CORE__FERNET_KEY=0ynI7-c_SHqsJSgK7XeFBzquIV9iJkfDL4Y1bkNX5bA=
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
AIRFLOW__CORE__EXECUTOR=LocalExecutor
_=/usr/bin/printenv
Итак, нет никакой переменной, которая соответствует теории бэкенда AWS системного менеджера в воздушном потоке
Тогда проблема в что я не могу прочитать в своей entrypoint.sh
и переменной environtment, которая является параметром, хранящимся в AWS System manager
, несмотря на настройку файла cfg
.
Параметр является secureString Параметр type и имеет имя: /airflow/connections/prod_rds
Значения представляют собой допустимый uri, например: postgresql+psycopg2://user:pwd@hostname:port/db.
Соединение работает, потому что оно было опробовано в клиенте базы данных.
Все это работает в docker с этой настройкой.
Dockerfile :
FROM ubuntu
ENV PYTHONUNBUFFERED 1
ARG AIRFLOW_VERSION=1.10.1
ARG AIRFLOW_USER_HOME=/usr/local/airflow
ENV AIRFLOW_HOME=${AIRFLOW_USER_HOME}
ENV PYTHONPATH "${PYTHONPATH}:/"
ENV SLUGIFY_USES_TEXT_UNIDECODE=yes
RUN apt-get update && apt-get install -y python3-pip mysql-server vim
RUN apt-get clean && apt-get update && apt-get install -y locales
RUN locale-gen en_US.UTF-8
ENV LANG en_US.UTF-8
ENV LANGUAGE en_US:en
ENV LC_ALL en_US.UTF-8
RUN set -ex \
&& buildDeps=' \
freetds-dev \
libkrb5-dev \
libsasl2-dev \
libssl-dev \
libffi-dev \
libpq-dev \
git \
'&& apt-get update -yqq \
&& apt-get upgrade -yqq \
&& apt-get install -yqq --no-install-recommends \
$buildDeps \
freetds-bin \
build-essential \
default-libmysqlclient-dev \
apt-utils \
curl \
rsync \
netcat \
locales \
&& sed -i 's/^# en_US.UTF-8 UTF-8$/en_US.UTF-8 UTF-8/g' /etc/locale.gen \
&& locale-gen \
&& update-locale LANG=en_US.UTF-8 LC_ALL=en_US.UTF-8 \
#&& useradd -ms /bin/bash -d ${AIRFLOW_USER_HOME} airflow \
&& pip install -U setuptools wheel\
&& pip install pytz \
&& pip install pyOpenSSL \
&& pip install ndg-httpsclient \
&& pip install pyasn1 \
&& pip install apache-airflow[crypto,postgres,ssh]==${AIRFLOW_VERSION} \
&& pip install 'redis==3.2' \
&& if [ -n "${PYTHON_DEPS}" ]; then pip install ${PYTHON_DEPS}; fi \
&& apt-get purge --auto-remove -yqq $buildDeps \
&& apt-get autoremove -yqq --purge \
&& apt-get clean \
&& rm -rf \
/var/lib/apt/lists/* \
/tmp/* \
/var/tmp/* \
/usr/share/man \
/usr/share/doc \
/usr/share/doc-base
RUN pip install --user psycopg2-binary
RUN pip install -r requirements.pip
ADD ./ ./
RUN chown -R 777 ${AIRFLOW_USER_HOME}
RUN chmod -R 777 /entrypoint.sh
WORKDIR "/"
ENTRYPOINT ["/entrypoint.sh"]
CMD ["webserver"]
Docker -compose.yml
version: '3'
services:
webserver:
build:
context: .
dockerfile: Dockerfile3
image: myws
container_name: my-container
volumes:
- ./f1:/f1
- ./dags:/usr/local/airflow/dags
- ./logs:/usr/local/airflow/logs
ports:
- "8081:8080"
command: webserver
scheduler:
container_name: mysch
build:
context: .
dockerfile: Dockerfile3
volumes:
- ./f1:/f1
- ./dags:/usr/local/airflow/dags
- ./logs:/usr/local/airflow/logs
ports:
- "8793:8793"
command: scheduler