разгрузка из красного смещения в s3 с использованием воздушного потока - PullRequest
0 голосов
/ 02 августа 2020

Моя цель - периодически (раз в день) выгружать данные из Redshift в корзину S3 с помощью Airflow. Мои учетные данные aws для доступа к корзине сохраняются как подключение к соединениям Airflow. Я подтвердил, что они работают, поскольку мне удалось перенести данные с Redshift на S3. Соединение Redshift также сохраняется в соединениях Airflow.

Я пробовал несколько вещей, но ни один из них не работал.

Метод 1: RedshiftToS3Transfer

dag = DAG(dag_id='redshift_s3',
  default_args=default_args,
  schedule_interval="@once",
  catchup=False,
  description='testing redshift connection'

)

tuk_tuk = RedshiftToS3Transfer(
  task_id='unload_to_S3',
  schema='javascript',
  table='ad_click',
  s3_bucket='airflow-reporting',
  s3_key='ad_click',
  redshift_conn_id='redshift-db', # redshift database
  aws_conn_id='my_s3', # s3 connection
  dag=dag,
  unload_options=['CSV', 'HEADER']
)

[tuk_tuk]

Этот вид of работал, поскольку папка с именем 'ad_click' была сгенерирована в целевом сегменте, и в папке появилось несколько файлов на основе aws разделения по умолчанию.

Однако есть две проблемы:

  1. Это выгружает всю таблицу! Есть ли возможность указать SQL при использовании RedshiftToS3Transfer? Я читал документацию по Airflow (https://airflow.apache.org/docs/stable/_api/airflow/operators/redshift_to_s3_operator/index.html?highlight=redshift#airflow .operators.redshift_to_s3_operator.RedshiftToS3Transfer ), но в параметрах нет указания, возможно ли это.

  2. Вторая проблема заключается в том, что когда я запускаю этот DAG во второй раз, я получаю сообщение об ошибке: psycopg2.errors.InternalError_: Specified unload destination on S3 is not empty. Consider using a different bucket / prefix, manually removing the target files in S3, or using the ALLOWOVERWRITE option.

Метод 2: используйте команду PostgresOperator + Unload

s3 = S3_hook.S3Hook('my_s3')

sql_one = """
SELECT * FROM javascript.ad_click;
    """

query = """
            UNLOAD ('{select_query}')
            TO 's3://{s3_bucket}/{s3_key}'
            CREDENTIALS
            'aws_access_key_id={access_key};aws_secret_access_key={secret_key}'
            REGION AS 'us-west-2'
            DELIMITER AS ',';
            """.format(select_query=sql_one,
                        s3_bucket='airflow-reporting',
                        s3_key='ad_click',
                        access_key=s3.access_key,
                        secret_key=s3.secret_key)

with DAG(dag_id='unload_with_hook', 
      schedule_interval = '@once', 
      default_args=default_args,
      catchup=False) as dag:
      
      dump_ad_clicks = PostgresOperator(
            task_id='unload_query',
            sql=query,
            postgres_conn_id='redshift-db'
            )

[dump_ad_clicks]

Проблемы

  1. очевидно, что соединение s3 не имеет атрибутов access_key и secret_key.
  2. Я попытался заменить ключ доступа и секретный ключ фактическими строками, чтобы диагностировать проблему, но затем получил эту обратную связь в журналах воздушного потока:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py",
line 966, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/postgres_operator.py",
line 65, in execute
    self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
  File "/usr/local/lib/python3.7/site-packages/airflow/hooks/dbapi_hook.py",
line 175, in run
    cur.execute(s)
psycopg2.errors.InternalError_: S3ServiceException:The request signature we calculated does not match the signature you provided.
Check your key and signing method.,Status 403,Error
SignatureDoesNotMatch,Rid B12AF06897204FA5,ExtRid
7m7lQ+Zbg2qRGum783YUveN74ckPYQSZM0XA8okBb1Ty78KXa/mQBdcB

На данный момент вид потерян, какой правильный подход. Ограничение состоит в том, что мне не нужен полный дамп - я хочу использовать пользовательский SQL для определения дневных лимитов и динамического назначения ключей файлам.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...