Не удается подключиться к Snowflake с недавно созданной средой Airflow на EC2 - PullRequest
0 голосов
/ 11 февраля 2020

У меня есть AWS шаблон Cloudformation, который создает базовую c среду воздушного потока (один экземпляр EC2 t3.small содержит и веб-сервер, и планировщик, без внешней БД, без исполнителя Celery). Эта среда подключается к хранилищу данных Snowflake для отправки файлов sh из S3 в базы данных на Snowflake. Я успешно создал среду воздушного потока на своем экземпляре EC2, и самое последнее, что я делаю в CFT, - это активирую планировщик воздушного потока. Журналы, кажется, указывают на чистый старт, и веб-сервер запускается без проблем. при запуске тестовой группы доступности базы данных для подключения к Snowflake, я получаю эту ошибку:

[2020-02-11 11:23:25,422] {__init__.py:1580} ERROR - 250001 (08001): None: Failed to connect to DB: xxxxxxxxxxx.us-east-1.snowflakecomputing.com:443. IP [69995088867598] is not allowed to access Snowflake.  Contact your local security administrator.

Вот что мы знаем:

1) IP-адрес от EC2 в порядке, это не занесен в черный список или не включен в белый список на стороне Snowflake, поэтому эта ошибка немного озадачивает.

2) Ручной запуск сценария Python вне Airflow работает нормально - подключение к Snowflake происходит должным образом.

3) Убить планировщик воздушного потока, запустить его обратно через «планировщик воздушного потока -D», а затем повторно запустить DAG, что приведет к успешному запуску.

4) В журналах не указано никаких других ошибок, кроме тех, что я выложил выше.

Вот часть пользовательских данных моего CFT:

    'Fn::Base64':
      !Sub |
      #!/bin/bash -x
      exec > /tmp/user-data.log 2>&1

      # Path settings
      export HOME=/root
      export AIRFLOW_HOME=$HOME/airflow
      export PATH=$PATH:/root/airflow:/root/airflow/bin

      # Egress proxy and exceptions

      # Step 1: Install Python version
      apt-get update -y
      apt-get install build-essential checkinstall -y
      apt-get install libssl-dev openssl zlib1g-dev libffi-dev libsqlite3-dev libpq-dev postgresql postgresql-client python3-psycopg2 -y
      wget https://www.python.org/ftp/python/3.7.3/Python-3.7.3.tgz
      tar xzvf Python-3.7.3.tgz
      cd Python-3.7.3
      ./configure
      make
      make altinstall

      # Step 2: Create virtual environment with new Python version
      cd ~
      python3.7 -m venv airflow

      # Create pip.conf
      echo '[global]
      proxy = http://http.proxy.com:8000
      index-url = https://${ArtifactoryUsername}:${ArtifactoryPassword}@artifactory.com/api/pypi/pypi-prereleases/simple
      index = https://${ArtifactoryUsername}:${ArtifactoryPassword}@artifactory.com/api/pypi/pypi-prereleases/simple' > $AIRFLOW_HOME/pip.conf

      # Allow these ports through the ufw firewall
      sudo ufw allow 8080

      # Upgrade Pip
      $AIRFLOW_HOME/bin/pip install --upgrade pip
      /usr/local/bin/aws s3 sync s3://${S3CDAPBucket}/dags $AIRFLOW_HOME/dags/

      # Install required PIP packages into virtual environment
      $AIRFLOW_HOME/bin/pip install -r $AIRFLOW_HOME/dags/requirements.txt --no-cache-dir --retries 10

      # Setup airflow local db; edit config file as needed
      $AIRFLOW_HOME/bin/airflow initdb
      sed -i 's/load_examples\s=\sTrue/load_examples = False/g' $AIRFLOW_HOME/airflow.cfg
      sed -i 's/default_dag_run_display_number\s=\s25/default_dag_run_display_number = 5/g' $AIRFLOW_HOME/airflow.cfg
      sed -i "/remote_logging\s=\sFalse/c\remote_logging = True" $AIRFLOW_HOME/airflow.cfg
      sed -i "/remote_base_log_folder\s=/c\remote_base_log_folder = s3://${S3CDAPBucket}/logs" $AIRFLOW_HOME/airflow.cfg

      # Re-init the database to use the postgres instance; start scheduler and webserver
      $AIRFLOW_HOME/bin/airflow pool -i $AIRFLOW_HOME/dags/config/airflow/airflow-pool-config.json
      sed -i "/snowflake_batch_password/c\   \"snowflake_batch_password\" : \"${SnowflakeBatchPassword}\"," $AIRFLOW_HOME/dags/config/airflow/airflow-variables-dev.json

      $AIRFLOW_HOME/bin/airflow variables -i $AIRFLOW_HOME/dags/config/airflow/airflow-variables-dev.json

      sudo apt autoremove -y
      chgrp -R cloud-user /root
      chmod -R g+rwx /root
      $AIRFLOW_HOME/bin/airflow webserver -D
      $AIRFLOW_HOME/bin/airflow scheduler -D

DAG Проверка подключения к снежинке выглядит следующим образом:

from airflow import DAG
from datetime import datetime, timedelta
import json
import os
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow import AirflowException
from snowflake import snowflake_hook
from aws.s3 import FileOps
import boto3
from dag_admin import common_dag_tasks, landing_zone_dag_tasks, raw_zone_dag_tasks
from logger import logger
from functools import partial
import traceback
from data_validation import batch_validation
from snowflake.snowflake_hook import SnowflakeHook

def snowflake_task():
    my_snowflake = SnowflakeHook(account='xxxxxxxxxxx.us-east-1',
                                            username='SNOWFLAKE_DEV_BATCH_USER',
                                            password='password',
                                            warehouse='DEV_BATCH_XSMALL',
                                            role='DEV_DB_DEVELOPER_ROLE')
    conn = my_snowflake.get_conn()
    cur = conn.cursor()
    try:
            cur.execute('select count(*) as row_cnt from DEV.LANDING__MST_DC_DBO.DC_ACCOUNTING_ITEM__BULK')
            for (row_cnt) in cur:
                print('Row Count = {}'.format(row_cnt))
    finally:
            cur.close()

    conn.close()


dag_id = 'snowflake-test'
my_dag = DAG(dag_id=dag_id,
             start_date=datetime.today(),
             schedule_interval=None)

start_task = PythonOperator(
                        task_id="start-task",
                        dag=my_dag,
                        python_callable=snowflake_task)

Requirements.txt:

apache-airflow==1.10.3
Jinja2==2.10.0
Werkzeug==0.14.1
tzlocal==1.5.1
Flask==1.0.4
snowflake-connector-python==2.0.3
inhouse-snowflake==1.0.0
marshmallow-sqlalchemy==0.17.1
inhouse-aws==1.0.0
inhouse-dag-admin==1.0.0
psycopg2==2.8.4
boto3==1.9.253
inhouse-logging==1.0.0
inhouse-data-validation==1.0.0

1 Ответ

0 голосов
/ 11 февраля 2020

Судя по ошибке, возможно, в вашей учетной записи Snowflake включена политика сети. Вам необходимо получить опубликованный c IP-адрес вашего компьютера EC2 и добавить его к разрешенным IP-адресам.

Здесь полное руководство. Кроме того, вам необходимо рассмотреть возможность использования Elasti c -IP и подключения к вашей машине, таким образом, IP-адрес publi c не изменится, даже если ваша машина отключена, вы все равно можете подключить publicIP.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...