Я прочитал документацию для создания Airflow Connection через переменную среды и использую Airflow v1.10.6 с Python3 .5 в Debian9.
Связанная документация выше показывает пример соединения S3 s3://accesskey:secretkey@S3
. После этого я определил следующую переменную среды:
AIRFLOW_CONN_AWS_S3=s3://#MY_ACCESS_KEY#:#MY_SECRET_ACCESS_KEY#@S3
И следующую функцию
def download_file_from_S3_with_hook(key, bucket_name):
"""Get file contents from S3"""
hook = airflow.hooks.S3_hook.S3Hook('aws_s3')
obj = hook.get_key(key, bucket_name)
contents = obj.get()['Body'].read().decode('utf-8')
return contents
Однако, когда Я вызываю эту функцию и получаю следующую ошибку:
Using connection to: id: aws_s3.
Host: #MY_ACCESS_KEY#,
Port: None,
Schema: #MY_SECRET_ACCESS_KEY#,
Login: None,
Password: None,
extra: {}
ERROR - Unable to locate credentials
Похоже, что когда я форматирую URI в соответствии с документацией Airflow, он использует ключ доступа в качестве хоста и секретный ключ доступа в качестве схемы.
Он явно читает переменную окружения, так как имеет правильный conn_id
. Он также имеет правильные значения для моего ключа доступа и секрета, он просто разбирает его под неправильным полем.
Когда я устанавливаю соединение в пользовательском интерфейсе, функция работает, если я установил Login
на свой ключ доступа и Password
моему токену. Так как же неправильно форматировать URI переменной среды?