Я довольно новичок в Airflow.Я несколько раз прочитал документацию, разобрался с многочисленными вопросами S / O и множеством случайных статей в Интернете, но пока не решил эту проблему.У меня ощущение, что это что-то супер простое, я делаю неправильно.У меня есть Docker для Windows, и я вытащил образ puckel/docker-airflow
и запустил контейнер с открытыми портами, чтобы я мог получить доступ к интерфейсу с моего хоста.У меня есть другой контейнер, работающий mcr.microsoft.com/mssql/server
, на котором я восстановил образец базы данных WideWorldImporters.Благодаря пользовательскому интерфейсу Airflow я смог успешно создать соединение с этой базой данных и даже запросить его в разделе «Профилирование данных».Проверьте изображения ниже: Создание соединения Успешный запрос к соединению
Так что, пока это работает, мой dag не выполняется при втором задании sqlData
.вот код:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mssql_operator import MsSqlOperator
from datetime import timedelta, datetime
copyData = DAG(
dag_id='copyData',
schedule_interval='@once',
start_date=datetime(2019,1,1)
)
printHelloBash = BashOperator(
task_id = "print_hello_Bash",
bash_command = 'echo "Lets copy some data"',
dag = copyData
)
mssqlConnection = "WWI"
sqlData = MsSqlOperator(sql="select top 100 InvoiceDate, TotalDryItems from sales.invoices",
task_id="select_some_data",
mssql_conn_id=mssqlConnection,
database="WideWorldImporters",
dag = copyData,
depends_on_past=True
)
queryDataSuccess = BashOperator(
task_id = "confirm_data_queried",
bash_command = 'echo "We queried data!"',
dag = copyData
)
printHelloBash >> sqlData >> queryDataSuccess
Первоначально ошибка была:
*[2019-02-22 16:13:09,176] {{logging_mixin.py:95}} INFO - [2019-02-22 16:13:09,176] {{base_hook.py:83}} INFO - Using connection to: 172.17.0.3
[2019-02-22 16:13:09,186] {{models.py:1760}} ERROR - Could not create Fernet object: Incorrect padding
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 171, in get_fernet
_fernet = Fernet(fernet_key.encode('utf-8'))
File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 34, in __init__
key = base64.urlsafe_b64decode(key)
File "/usr/local/lib/python3.6/base64.py", line 133, in urlsafe_b64decode
return b64decode(s)
File "/usr/local/lib/python3.6/base64.py", line 87, in b64decode
return binascii.a2b_base64(s)
binascii.Error: Incorrect padding*
Я заметил, что это связано с криптографией, и я пошел вперед и запустил pip install cryptography
и pip install airflow[crytpo]
, где оба возвращали одинаковые результаты, сообщая мне, что требование уже выполнено.Наконец, я нашел что-то, что говорит, что мне просто нужно сгенерировать fernet_key.Ключ по умолчанию в моем файле airflow.cfg был fernet_key = $FERNET_KEY
.Итак, из клика в контейнере я запустил:
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
и получил код, который я заменил $FERNET_KEY
на.Я перезапустил контейнер и снова запустил dag, и теперь моя ошибка:
[2019-02-22 16:22:13,641] {{models.py:1760}} ERROR -
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 106, in _verify_signature
h.verify(data[-32:])
File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/primitives/hmac.py", line 69, in verify
ctx.verify(signature)
File "/usr/local/lib/python3.6/site-packages/cryptography/hazmat/backends/openssl/hmac.py", line 73, in verify
raise InvalidSignature("Signature did not match digest.")
cryptography.exceptions.InvalidSignature: Signature did not match digest.
Какая из первоначальных проверок крипто-документа имеет какое-то отношение к совместимости?
Я нахожусь напотерял сейчас и решил, что я задам этот вопрос, чтобы увидеть, могу ли я пойти по неверному пути в решении этого.Любая помощь будет принята с благодарностью, так как Airflow выглядит потрясающе.