У меня на сервере ec2 работает airflow, который я должен использовать прокси для всех внешних запросов https. Следующая функция работает с dag.
import boto3
from botocore.config import Config
def get_files(**context):
s3 = boto3.client('s3',config=Config(proxies={'https': 'mycorpsproxy.com:3128'}))
s3_bucket = "some_bucket"
paginator = s3.get_paginator("list_objects")
page_iterator = paginator.paginate(
Bucket=s3_bucket, Prefix="folder_like_prefix/"
current_files = []
for page in page_iterator:
if 'Contents' not in page:
continue
for object in page['Contents']:
current_files.append(object['Key'])
return current_files
Но я не хочу жестко кодировать этот прокси в каждом dag. Я хотел бы использовать крючок s3, но не могу найти, что поместить в дополнительное поле в соединениях воздушного потока, чтобы оно работало.
Я пробовал несколько вариантов, например
![Airflow connection web interface prefilled out with S3](https://i.imgur.com/Q1VXjmG.png)
но я получаю ошибки json при запуске dag
[2019-11-04 14:08:01,266] {logging_mixin.py:112} INFO - [2019-11-04 14:08:01,266] {connection.py:296} ERROR - Expecting value: line 1 column 1 (char 0)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/airflow/models/connection.py", line 294, in extra_dejson
obj = json.loads(self.extra)
File "/usr/lib/python3.6/json/__init__.py", line 354, in loads
return _default_decoder.decode(s)
File "/usr/lib/python3.6/json/decoder.py", line 339, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib/python3.6/json/decoder.py", line 357, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
[2019-11-04 14:08:01,267] {logging_mixin.py:112} INFO - [2019-11-04 14:08:01,266] {connection.py:297} ERROR - Failed parsing the json for conn_id s3_airflow