Я пытаюсь настроить Airflow для управления нашими процессами ETL, запускаю экземпляр EC2 с Amazon Linux 2 AMI, создал пользователя с именем airflow
, переместил мой код в /home/airflow/airflow
(так ~/airflow/dags
и т. Д.), а затем установите его с помощью systemd
следующим образом:
(учетные данные и конфиденциальная информация отредактированы)
Файл среды воздушного потока:
/etc/sysconfig/airflow
:
SCHEDULER_RUNS=5
#Airflow specific settings
AIRFLOW_HOME="/home/airflow/airflow/"
AIRFLOW_CONN_REDSHIFT_CONNECTION=""
AIRFLOW_CONN_S3_CONNECTION=""
AIRFLOW_CONN_S3_LOGS_CONNECTION=""
AIRFLOW__CORE__FERNET_KEY=""
Файлы конфигурации служб Airflow Systemd:
In /usr/lib/systemd/system/
:
airflow-scheduler.service
(по ссылке с /home/airflow/.airflow_config/
-rw-r--r-- 1 root root 1.3K Feb 21 16:18 airflow-scheduler.service
)
[Unit]
Description=Airflow scheduler daemon
After=network.target postgresql.service mysql.service redis.service rabbitmq-server.service
Wants=postgresql.service mysql.service redis.service rabbitmq-server.service
[Service]
EnvironmentFile=/etc/sysconfig/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/bin/bash -c ' source /home/airflow/.env/bin/activate ; source /home/airflow/.bashrc ; airflow scheduler'
Restart=always
RestartSec=5s
[Install]
WantedBy=multi-user.target
airflow-webserver.service
:
(символическая ссылка от /home/airflow/.airflow_config/
-rw-r--r-- 1 root root 1.4K Feb 20 14:38 airflow-webserver.service
)
[Unit]
Description=Airflow webserver daemon
After=network.target postgresql.service mysql.service redis.service rabbitmq-server.service
Wants=postgresql.service mysql.service redis.service rabbitmq-server.service
[Service]
EnvironmentFile=/etc/sysconfig/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/bin/bash -c 'source /home/airflow/.env/bin/activate ; source /home/airflow/.bashrc ; airflow webserver -p 8080 --pid /run/airflow/webserver.pid'
Restart=on-failure
RestartSec=5s
PrivateTmp=true
[Install]
WantedBy=multi-user.target
Воздушный потокПользователь .bashrc Файл:
/home/airflow/.bashrc
:
# .bashrc
# Source global definitions
if [ -f /etc/bashrc ]; then
. /etc/bashrc
fi
# User specific aliases and functions
# Aliases
alias python="python3"
alias pip="pip3"
alias airflow_venv="source $HOME/.env/bin/activate"
#Airflow specific settings
export AIRFLOW_HOME="/home/airflow/airflow/"
export AIRFLOW_CONN_REDSHIFT_CONNECTION=""
export AIRFLOW_CONN_S3_CONNECTION=""
export AIRFLOW_CONN_S3_LOGS_CONNECTION=""
export AIRFLOW__CORE__FERNET_KEY=""
# Credentials
export EXTERNAL_SERVICE_CREDENTIAL=""
export EXTERNAL_SERVICE_PASSWORD=""
Файл конфигурации воздушного потока:
airflow.cfg
: (символическая ссылка /home/airflow/.airflow_config/
-rw-r--r-- 1 airflow airflow 5.3K Feb 12 17:45 airflow.cfg
[core]
airflow_home = /home/airflow/airflow
dags_folder = /home/airflow/airflow/dags
base_log_folder = /home/airflow/airflow/logs
plugins_folder = /home/airflow/airflow/plugins
sql_alchemy_conn =
child_process_log_directory = /home/airflow/airflow/logs/scheduler
executor = LocalExecutor
remote_logging = True
remote_log_conn_id = s3_logs_connection
remote_base_log_folder = s3://my-bucket-here
encrypt_s3_logs = False
Параметры DAG по умолчанию:
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'retry_on_failure': True,
'task_concurrency': 1,
'start_date': datetime(2019, 2, 19),
'max_active_runs': 1}
dag_name_here = DAG(
"dag_name_here", default_args=default_args, schedule_interval=timedelta(days=1))
Теперь у меня проблема в том, что внутри DAG с несколькими задачами каждая использует свою переменную среды os,либо учетные данные (определенные только в .bashrc
), либо соединениеon (определяется как в /etc/sysconfig/airflow
и .bashrc
), иногда первое задание всегда завершается неудачно, иногда это второе, иногда третье и т. д., что означает, что для обратной засыпки я мог бы видеть 3 DagRuns параллельноесли первая задача в группе обеспечения доступности баз данных работает правильно, а вторая не смогла получить переменную env.
Задача может быть, например, CreateStagingRedshiftTable
, она может быть выполнена успешно, а затем может выдаться следующая PopulateStagingTable
ошибка Connection does not exist
, даже если они используют один и тот же.
Я пробовал использовать нет, одинарные и двойные кавычки в envfile, экспортируя и не экспортируя переменные в .bashrc и .bash_profile, используя profile.d
, и я постоянно работаю systemctl daemon-reload && systemctl restart airflow-scheduler.service && systemctl restart airflow-webserver.service
Любые идеи или помощь приветствуются.