Проблема с потоком Fernet_Key при попытке запроса базы данных mssql - PullRequest
0 голосов
/ 22 февраля 2019

Я довольно новичок в Airflow.Я несколько раз прочитал документацию, разобрался с многочисленными вопросами S / O и множеством случайных статей в Интернете, но пока не решил эту проблему.У меня ощущение, что это что-то супер простое, я делаю неправильно.У меня есть Docker для Windows, и я вытащил образ puckel/docker-airflow и запустил контейнер с открытыми портами, чтобы я мог получить доступ к интерфейсу с моего хоста.У меня есть другой контейнер, работающий mcr.microsoft.com/mssql/server, на котором я восстановил образец базы данных WideWorldImporters.Благодаря пользовательскому интерфейсу Airflow я смог успешно создать соединение с этой базой данных и даже запросить его в разделе «Профилирование данных».Проверьте изображения ниже: Создание соединения Успешный запрос к соединению

Так что, пока это работает, мой dag не выполняется при втором задании sqlData.вот код:

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mssql_operator import MsSqlOperator
from datetime import timedelta, datetime

copyData = DAG(
    dag_id='copyData',
    schedule_interval='@once',
    start_date=datetime(2019,1,1)
)


printHelloBash = BashOperator(
    task_id = "print_hello_Bash",
    bash_command = 'echo "Lets copy some data"',
    dag = copyData
)

mssqlConnection = "WWI"
sqlData = MsSqlOperator(sql="select top 100 InvoiceDate, TotalDryItems from sales.invoices",
                       task_id="select_some_data",
                       mssql_conn_id=mssqlConnection,
                       database="WideWorldImporters",
                       dag = copyData,
                       depends_on_past=True
          )

queryDataSuccess = BashOperator(
    task_id = "confirm_data_queried",
    bash_command = 'echo "We queried data!"',
    dag = copyData
)

printHelloBash >> sqlData >> queryDataSuccess

Первоначально ошибка была:

*[2019-02-22 16:13:09,176] {{logging_mixin.py:95}} INFO - [2019-02-22 16:13:09,176] {{base_hook.py:83}} INFO - Using connection to: 172.17.0.3  
[2019-02-22 16:13:09,186] {{models.py:1760}} ERROR - Could not create Fernet object: Incorrect padding  
Traceback (most recent call last):  
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 171, in get_fernet
    _fernet = Fernet(fernet_key.encode('utf-8'))  
  File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 34, in __init__
    key = base64.urlsafe_b64decode(key)  
  File "/usr/local/lib/python3.6/base64.py", line 133, in urlsafe_b64decode
    return b64decode(s)  
  File "/usr/local/lib/python3.6/base64.py", line 87, in b64decode
    return binascii.a2b_base64(s)
binascii.Error: Incorrect padding*

Я заметил, что это связано с криптографией, и я пошел вперед и запустил pip install cryptography и pip install airflow[crytpo], где оба возвращали одинаковые результаты, сообщая мне, что требование уже выполнено.Наконец, я нашел что-то, что говорит, что мне просто нужно сгенерировать fernet_key.Ключ по умолчанию в моем файле airflow.cfg был fernet_key = $FERNET_KEY.Итак, из клика в контейнере я запустил:

python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"

и получил код, который я заменил $FERNET_KEY на.Я перезапустил контейнер и снова запустил dag, и теперь моя ошибка:

[2019-02-22 16:22:13,641] {{models.py:1760}} ERROR -   
Traceback (most recent call last):  
  File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 106, in _verify_signature
    h.verify(data[-32:])  
  File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/primitives/hmac.py", line 69, in verify
    ctx.verify(signature)  
  File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/backends/openssl/hmac.py", line 73, in verify
    raise InvalidSignature("Signature did not match digest.")  
cryptography.exceptions.InvalidSignature: Signature did not match digest.

Какая из первоначальных проверок крипто-документа имеет какое-то отношение к совместимости?

Я нахожусь напотерял сейчас и решил, что я задам этот вопрос, чтобы увидеть, могу ли я пойти по неверному пути в решении этого.Любая помощь будет принята с благодарностью, так как Airflow выглядит потрясающе.

1 Ответ

0 голосов
/ 25 февраля 2019

Благодаря некоторому побочному общению от @Tomasz я наконец-то заставил свою DAG работать.Он порекомендовал мне попробовать использовать docker-compose, который также указан в репозитории puckel / docker-airflow github.В итоге я использовал файл docker-compose-LocalExecutor.yml вместо Celery Executor.Было небольшое устранение неполадок и больше настроек, которые мне пришлось пройти.Для начала я взял свой существующий контейнер MSSQL, в котором был образец базы данных, и превратил его в изображение, используя docker commit mssql_container_name.Единственная причина, по которой я это сделал, - это сэкономить время на восстановление резервной копии базы данных;вы всегда можете скопировать резервные копии в контейнер и восстановить их позже, если хотите.Затем я добавил свой новый образ в существующий файл docker-compose-LocalExecutor.yml следующим образом:

version: '2.1'
services:
    postgres:
        image: postgres:9.6
        environment:
            - POSTGRES_USER=airflow
            - POSTGRES_PASSWORD=airflow
            - POSTGRES_DB=airflow

    mssql:
        image: dw:latest
        ports:
            - "1433:1433"

    webserver:
        image: puckel/docker-airflow:1.10.2
        restart: always
        depends_on:
            - postgres
            - mssql
        environment:
            - LOAD_EX=n
            - EXECUTOR=Local
        #volumes:
            #- ./dags:/usr/local/airflow/dags
            # Uncomment to include custom plugins
            # - ./plugins:/usr/local/airflow/plugins
        ports:
            - "8080:8080"
        command: webserver
        healthcheck:
            test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
            interval: 30s
            timeout: 30s
            retries: 3

Имейте в виду, dw - это то, что я назвал новым образом, основанным наконтейнера mssql.Затем я переименовал файл в docker-compose.yml , чтобы я мог легко запустить docker-compose up (не уверен, есть ли команда, указывающая непосредственно на другой файл YAML).Когда все было готово, я перешел к интерфейсу Airflow и настроил соединение.Примечание: , поскольку вы используете docker-compose, вам не нужно знать IP-адрес других контейнеров, поскольку они используют обнаружение службы DNS, о котором я узнал о здесь .Затем, чтобы проверить соединение, я пошел в Data Profiling, чтобы выполнить специальный запрос, но соединения там не было.Это связано с тем, что в образе puckel / docker-airflow не установлен pymssql .Так что просто зайдите в контейнер docker exec -it airflow_webserver_container bash и установите его pip install pymssql --user.Выйдите из контейнера и перезапустите все службы, используя docker-compose restart.Через минуту все заработало.Мое соединение обнаружилось в специальном запросе, и я смог успешно выбрать данные.Наконец, я включил DAG, планировщик поднял его, и все прошло успешно!Супер облегчение после нескольких недель поиска в Google.Спасибо @ y2k-shubham за помощь и некоторую огромную благодарность @Tomasz, к которому я первоначально обратился после его удивительного и тщательного поста об Airflow в субреддите r / datascience.

...