У меня есть AWS шаблон Cloudformation, который создает базовую c среду воздушного потока (один экземпляр EC2 t3.small содержит и веб-сервер, и планировщик, без внешней БД, без исполнителя Celery). Эта среда подключается к хранилищу данных Snowflake для отправки файлов sh из S3 в базы данных на Snowflake. Я успешно создал среду воздушного потока на своем экземпляре EC2, и самое последнее, что я делаю в CFT, - это активирую планировщик воздушного потока. Журналы, кажется, указывают на чистый старт, и веб-сервер запускается без проблем. при запуске тестовой группы доступности базы данных для подключения к Snowflake, я получаю эту ошибку:
[2020-02-11 11:23:25,422] {__init__.py:1580} ERROR - 250001 (08001): None: Failed to connect to DB: xxxxxxxxxxx.us-east-1.snowflakecomputing.com:443. IP [69995088867598] is not allowed to access Snowflake. Contact your local security administrator.
Вот что мы знаем:
1) IP-адрес от EC2 в порядке, это не занесен в черный список или не включен в белый список на стороне Snowflake, поэтому эта ошибка немного озадачивает.
2) Ручной запуск сценария Python вне Airflow работает нормально - подключение к Snowflake происходит должным образом.
3) Убить планировщик воздушного потока, запустить его обратно через «планировщик воздушного потока -D», а затем повторно запустить DAG, что приведет к успешному запуску.
4) В журналах не указано никаких других ошибок, кроме тех, что я выложил выше.
Вот часть пользовательских данных моего CFT:
'Fn::Base64':
!Sub |
#!/bin/bash -x
exec > /tmp/user-data.log 2>&1
# Path settings
export HOME=/root
export AIRFLOW_HOME=$HOME/airflow
export PATH=$PATH:/root/airflow:/root/airflow/bin
# Egress proxy and exceptions
# Step 1: Install Python version
apt-get update -y
apt-get install build-essential checkinstall -y
apt-get install libssl-dev openssl zlib1g-dev libffi-dev libsqlite3-dev libpq-dev postgresql postgresql-client python3-psycopg2 -y
wget https://www.python.org/ftp/python/3.7.3/Python-3.7.3.tgz
tar xzvf Python-3.7.3.tgz
cd Python-3.7.3
./configure
make
make altinstall
# Step 2: Create virtual environment with new Python version
cd ~
python3.7 -m venv airflow
# Create pip.conf
echo '[global]
proxy = http://http.proxy.com:8000
index-url = https://${ArtifactoryUsername}:${ArtifactoryPassword}@artifactory.com/api/pypi/pypi-prereleases/simple
index = https://${ArtifactoryUsername}:${ArtifactoryPassword}@artifactory.com/api/pypi/pypi-prereleases/simple' > $AIRFLOW_HOME/pip.conf
# Allow these ports through the ufw firewall
sudo ufw allow 8080
# Upgrade Pip
$AIRFLOW_HOME/bin/pip install --upgrade pip
/usr/local/bin/aws s3 sync s3://${S3CDAPBucket}/dags $AIRFLOW_HOME/dags/
# Install required PIP packages into virtual environment
$AIRFLOW_HOME/bin/pip install -r $AIRFLOW_HOME/dags/requirements.txt --no-cache-dir --retries 10
# Setup airflow local db; edit config file as needed
$AIRFLOW_HOME/bin/airflow initdb
sed -i 's/load_examples\s=\sTrue/load_examples = False/g' $AIRFLOW_HOME/airflow.cfg
sed -i 's/default_dag_run_display_number\s=\s25/default_dag_run_display_number = 5/g' $AIRFLOW_HOME/airflow.cfg
sed -i "/remote_logging\s=\sFalse/c\remote_logging = True" $AIRFLOW_HOME/airflow.cfg
sed -i "/remote_base_log_folder\s=/c\remote_base_log_folder = s3://${S3CDAPBucket}/logs" $AIRFLOW_HOME/airflow.cfg
# Re-init the database to use the postgres instance; start scheduler and webserver
$AIRFLOW_HOME/bin/airflow pool -i $AIRFLOW_HOME/dags/config/airflow/airflow-pool-config.json
sed -i "/snowflake_batch_password/c\ \"snowflake_batch_password\" : \"${SnowflakeBatchPassword}\"," $AIRFLOW_HOME/dags/config/airflow/airflow-variables-dev.json
$AIRFLOW_HOME/bin/airflow variables -i $AIRFLOW_HOME/dags/config/airflow/airflow-variables-dev.json
sudo apt autoremove -y
chgrp -R cloud-user /root
chmod -R g+rwx /root
$AIRFLOW_HOME/bin/airflow webserver -D
$AIRFLOW_HOME/bin/airflow scheduler -D
DAG Проверка подключения к снежинке выглядит следующим образом:
from airflow import DAG
from datetime import datetime, timedelta
import json
import os
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow import AirflowException
from snowflake import snowflake_hook
from aws.s3 import FileOps
import boto3
from dag_admin import common_dag_tasks, landing_zone_dag_tasks, raw_zone_dag_tasks
from logger import logger
from functools import partial
import traceback
from data_validation import batch_validation
from snowflake.snowflake_hook import SnowflakeHook
def snowflake_task():
my_snowflake = SnowflakeHook(account='xxxxxxxxxxx.us-east-1',
username='SNOWFLAKE_DEV_BATCH_USER',
password='password',
warehouse='DEV_BATCH_XSMALL',
role='DEV_DB_DEVELOPER_ROLE')
conn = my_snowflake.get_conn()
cur = conn.cursor()
try:
cur.execute('select count(*) as row_cnt from DEV.LANDING__MST_DC_DBO.DC_ACCOUNTING_ITEM__BULK')
for (row_cnt) in cur:
print('Row Count = {}'.format(row_cnt))
finally:
cur.close()
conn.close()
dag_id = 'snowflake-test'
my_dag = DAG(dag_id=dag_id,
start_date=datetime.today(),
schedule_interval=None)
start_task = PythonOperator(
task_id="start-task",
dag=my_dag,
python_callable=snowflake_task)
Requirements.txt:
apache-airflow==1.10.3
Jinja2==2.10.0
Werkzeug==0.14.1
tzlocal==1.5.1
Flask==1.0.4
snowflake-connector-python==2.0.3
inhouse-snowflake==1.0.0
marshmallow-sqlalchemy==0.17.1
inhouse-aws==1.0.0
inhouse-dag-admin==1.0.0
psycopg2==2.8.4
boto3==1.9.253
inhouse-logging==1.0.0
inhouse-data-validation==1.0.0