Моя цель - периодически (раз в день) выгружать данные из Redshift в корзину S3 с помощью Airflow. Мои учетные данные aws для доступа к корзине сохраняются как подключение к соединениям Airflow. Я подтвердил, что они работают, поскольку мне удалось перенести данные с Redshift на S3. Соединение Redshift также сохраняется в соединениях Airflow.
Я пробовал несколько вещей, но ни один из них не работал.
Метод 1: RedshiftToS3Transfer
dag = DAG(dag_id='redshift_s3',
default_args=default_args,
schedule_interval="@once",
catchup=False,
description='testing redshift connection'
)
tuk_tuk = RedshiftToS3Transfer(
task_id='unload_to_S3',
schema='javascript',
table='ad_click',
s3_bucket='airflow-reporting',
s3_key='ad_click',
redshift_conn_id='redshift-db', # redshift database
aws_conn_id='my_s3', # s3 connection
dag=dag,
unload_options=['CSV', 'HEADER']
)
[tuk_tuk]
Этот вид of работал, поскольку папка с именем 'ad_click' была сгенерирована в целевом сегменте, и в папке появилось несколько файлов на основе aws разделения по умолчанию.
Однако есть две проблемы:
Это выгружает всю таблицу! Есть ли возможность указать SQL при использовании RedshiftToS3Transfer
? Я читал документацию по Airflow (https://airflow.apache.org/docs/stable/_api/airflow/operators/redshift_to_s3_operator/index.html?highlight=redshift#airflow .operators.redshift_to_s3_operator.RedshiftToS3Transfer ), но в параметрах нет указания, возможно ли это.
Вторая проблема заключается в том, что когда я запускаю этот DAG во второй раз, я получаю сообщение об ошибке: psycopg2.errors.InternalError_: Specified unload destination on S3 is not empty. Consider using a different bucket / prefix, manually removing the target files in S3, or using the ALLOWOVERWRITE option.
Метод 2: используйте команду PostgresOperator + Unload
s3 = S3_hook.S3Hook('my_s3')
sql_one = """
SELECT * FROM javascript.ad_click;
"""
query = """
UNLOAD ('{select_query}')
TO 's3://{s3_bucket}/{s3_key}'
CREDENTIALS
'aws_access_key_id={access_key};aws_secret_access_key={secret_key}'
REGION AS 'us-west-2'
DELIMITER AS ',';
""".format(select_query=sql_one,
s3_bucket='airflow-reporting',
s3_key='ad_click',
access_key=s3.access_key,
secret_key=s3.secret_key)
with DAG(dag_id='unload_with_hook',
schedule_interval = '@once',
default_args=default_args,
catchup=False) as dag:
dump_ad_clicks = PostgresOperator(
task_id='unload_query',
sql=query,
postgres_conn_id='redshift-db'
)
[dump_ad_clicks]
Проблемы
- очевидно, что соединение s3 не имеет атрибутов access_key и secret_key.
- Я попытался заменить ключ доступа и секретный ключ фактическими строками, чтобы диагностировать проблему, но затем получил эту обратную связь в журналах воздушного потока:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py",
line 966, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/operators/postgres_operator.py",
line 65, in execute
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
File "/usr/local/lib/python3.7/site-packages/airflow/hooks/dbapi_hook.py",
line 175, in run
cur.execute(s)
psycopg2.errors.InternalError_: S3ServiceException:The request signature we calculated does not match the signature you provided.
Check your key and signing method.,Status 403,Error
SignatureDoesNotMatch,Rid B12AF06897204FA5,ExtRid
7m7lQ+Zbg2qRGum783YUveN74ckPYQSZM0XA8okBb1Ty78KXa/mQBdcB
На данный момент вид потерян, какой правильный подход. Ограничение состоит в том, что мне не нужен полный дамп - я хочу использовать пользовательский SQL для определения дневных лимитов и динамического назначения ключей файлам.