Docker-compose с Airflow - MS SQL Server (соединение не установлено) - PullRequest
0 голосов
/ 29 мая 2019

Я не могу подключиться к SQL Server внутри Airflow с помощью docker-compose.Я хочу получить данные из SQL Server напрямую в облачное хранилище, а затем данные будут отправлены в Big Query.

Как решить эту проблему?

import json
from datetime import timedelta, datetime

from airflow import DAG
from airflow.models import Variable
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator


default_args = {
    'owner': 'Test Data',
    'depends_on_past': True,    
    'start_date': datetime(2019, 5, 29),
    'end_date': datetime(2019, 5, 30),
    'email': ['email@clientx.com.br'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Set Schedule: Run pipeline once a day. 
# Use cron to define exact time. Eg. 8:15am would be "15 08 * * *"
schedule_interval = "* * * * *"

# Define DAG: Set ID and assign default args and schedule interval
dag = DAG(
    'bigquery_github_trends', 
    default_args=default_args, 
    schedule_interval=schedule_interval
    )


extract = MySqlToGoogleCloudStorageOperator(
    task_id='chama_extract',
    mysql_conn_id='mysql_hml',
    google_cloud_storage_conn_id='my_gcp_conn',
    sql="""SELECT * FROM test""",
    bucket='my_bucket',
    filename='test/test{}.json',
    schema_filename='schemas/test.json',
    dag=dag)

load = GoogleCloudStorageToBigQueryOperator(
            task_id='chama_load',
            bigquery_conn_id='my_gcp_conn',
            google_cloud_storage_conn_id='my_gcp_conn',
            bucket='my_bucket',
            destination_project_dataset_table="tst.teste123",
            source_objects=['test/test0.json'],
            schema_object='schemas/test.json',
            source_format='NEWLINE_DELIMITED_JSON',
            create_disposition='CREATE_IF_NEEDED',
            write_disposition='WRITE_TRUNCATE',
            dag=dag)


# Setting up Dependencies
load.set_upstream(extract)

Docker-compose.yml

version: '3'
services:
  postgres:
    image: postgres:9.6
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    ports:
      - "5432:5432"

  webserver:
    image: puckel/docker-airflow:1.10.1
    build:
      context: https://github.com/puckel/docker-airflow.git#1.10.1
      dockerfile: Dockerfile
      args:
        AIRFLOW_DEPS: gcp_api,s3        
    restart: always
    depends_on:
      - postgres
    environment:
      - LOAD_EX=n
      - EXECUTOR=Local
      - FERNET_KEY=jsDPRErfv8Z_eVTnGfF8ywd19j4pyqE3NpdUBA_oRTo=
    volumes:
      - ./examples/intro-example/dags:/usr/local/airflow/dags
      # Uncomment to include custom plugins
      # - ./plugins:/usr/local/airflow/plugins
    ports:
      - "8080:8080"
    command: webserver
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 3

docker-compose-gcloud.yml

version: '3'
services:
  postgres:
    image: postgres:9.6
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    ports:
      - "5432:5432"

  webserver:
    image: puckel/docker-airflow:1.10.1
    build:
      context: https://github.com/puckel/docker-airflow.git#1.10.1
      dockerfile: Dockerfile
      args:
        AIRFLOW_DEPS: gcp_api,s3        
    restart: always
    depends_on:
      - postgres
    environment:
      - LOAD_EX=n
      - EXECUTOR=Local
      - FERNET_KEY=jsDPRErfv8Z_eVTnGfF8ywd19j4pyqE3NpdUBA_oRTo=
    volumes:
      - ./examples/gcloud-example/dags:/usr/local/airflow/dags
      # Uncomment to include custom plugins
      # - ./plugins:/usr/local/airflow/plugins
    ports:
      - "8080:8080"
    command: webserver
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 3

и выполнить в docker команду:

docker-compose -f docker-compose-gcloud.yml up --abort-on-контейнер-выход

Сообщение об ошибке в потоке воздуха:

[2019-05-29 07: 00: 37,938] {{logging_mixin.py:95}} INFO - [2019-05-29 07: 00: 37,937] {{base_hook.py:83}} INFO - Использование соединения с: 10.0.0.1 [2019-05-29 07: 00: 58,974] {{models.py:1760}}ОШИБКА - (2003, «Не удается подключиться к серверу MySQL на 10.0.0.1 (111« Отказано в соединении »)»)Traceback (последний вызов был последним):Файл "/usr/local/lib/python3.6/site-packages/airflow/models.py", строка 1659, в _run_raw_taskresult = task_copy.execute (context = context)Файл "/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/mysql_to_gcs.py", строка 105, в исполнениикурсор = self._query_mysql ()Файл "/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/mysql_to_gcs.py", строка 127, в _query_mysqlconn = mysql.get_conn ()Файл "/usr/local/lib/python3.6/site-packages/airflow/hooks/mysql_hook.py", строка 103, в get_connconn = MySQLdb.connect (** conn_config)Файл "/usr/local/lib/python3.6/site-packages/MySQLdb/init.py", строка 84, в Connectобратное соединение (* args, ** kwargs)Файл "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", строка 164, в init super (Соединение, self). init (* args, ** kwargs2)MySQLdb._exceptions.OperationalError: (2003, «Не удается подключиться к серверу MySQL на 10.0.0.1 (111« Отказано в соединении »)»)[2019-05-29 07: 00: 58,988] {{models.py:1789}} ИНФОРМАЦИЯ - все повторные попытки не выполнены;пометить задачу как невыполненную[2019-05-29 07: 00: 58,992] {{logging_mixin.py:95}} INFO - [2019-05-29 07: 00: 58,991] {{configuration.py:255}} ПРЕДУПРЕЖДЕНИЕ - раздел / ключ [smtp / smtp_user] не найден в конфигурации[2019-05-29 07: 00: 58,998] {{models.py:1796}} ОШИБКА - [Errno 99] Невозможно назначить запрошенный адресTraceback (последний вызов был последним):Файл "/usr/local/lib/python3.6/site-packages/airflow/models.py", строка 1659, в _run_raw_taskresult = task_copy.execute (context = context)Файл "/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/mysql_to_gcs.py", строка 105, в исполнениикурсор = self._query_mysql ()Файл "/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/mysql_to_gcs.py", строка 127, в _query_mysqlconn = mysql.get_conn ()Файл "/usr/local/lib/python3.6/site-packages/airflow/hooks/mysql_hook.py", строка 103, в get_connconn = MySQLdb.connect (** conn_config)Файл "/usr/local/lib/python3.6/site-packages/MySQLdb/init.py", строка 84, в Connectобратное соединение (* args, ** kwargs)Файл "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", строка 164, в init super (Соединение, self). init (* args, ** kwargs2)MySQLdb._exceptions.OperationalError: (2003, «Не удается подключиться к серверу MySQL на 10.0.0.1 (111« Отказано в соединении »)»)

1 Ответ

0 голосов
/ 30 мая 2019

Из-за ошибки ключевой частью мне кажется кусок get_conn. Это указывает на то, что когда airflow пытается установить соединение с базой данных, происходит сбой. Это означает, что либо ваше соединение не указано (похоже, что оно может быть), либо что какая-то его часть неверна.

Вы должны проверить правильность пароля, адреса сервера и порта. Они должны быть либо в файле airflow.cfg, либо в качестве переменных среды, либо в веб-сервере (панель администратора)

...