Воздушный поток - Использование Redshift Operator для загрузки данных из Postgres в S3 FAILS - PullRequest
0 голосов
/ 03 мая 2020

Поскольку не существует способов загрузки данных из Postgres в S3, я пытаюсь использовать для этого оператор Redshift, но я получаю следующую ошибку:

[2020-05-03 18:53:07,359] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: postgres_to_S3.postgres_to_S3 2020-01-01T00:00:00+00:00 [queued]>
[2020-05-03 18:53:07,368] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: postgres_to_S3.postgres_to_S3 2020-01-01T00:00:00+00:00 [queued]>
[2020-05-03 18:53:07,368] {taskinstance.py:879} INFO - 
--------------------------------------------------------------------------------
[2020-05-03 18:53:07,368] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-05-03 18:53:07,368] {taskinstance.py:881} INFO - 
--------------------------------------------------------------------------------
[2020-05-03 18:53:07,385] {taskinstance.py:900} INFO - Executing <Task(RedshiftToS3Transfer): postgres_to_S3> on 2020-01-01T00:00:00+00:00
[2020-05-03 18:53:07,389] {standard_task_runner.py:53} INFO - Started process 124750 to run task
[2020-05-03 18:53:07,445] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: postgres_to_S3.postgres_to_S3 2020-01-01T00:00:00+00:00 [running]> erdc-Virtual-Machine
[2020-05-03 18:53:07,487] {redshift_to_s3_operator.py:124} INFO - Executing UNLOAD command...
[2020-05-03 18:53:07,491] {logging_mixin.py:112} INFO - [2020-05-03 18:53:07,490] {base_hook.py:87} INFO - Using connection to: id: pg_local. Host: 192.168.0.199, Port: 5432, Schema: AdventureWorks, Login: myuser, Password: mypass, extra: None
[2020-05-03 18:53:07,498] {logging_mixin.py:112} INFO - [2020-05-03 18:53:07,498] {dbapi_hook.py:174} INFO - 
                    UNLOAD ('SELECT * FROM AdventureWorks.dimgeography')
                    TO 's3://mybucket/S3_GEOGRAPHY/dimgeography_'
                    with credentials
                    'aws_access_key_id=mykey;aws_secret_access_key=mysecretkey'
                    ;

**[2020-05-03 18:53:07,499] {taskinstance.py:1145} ERROR - syntax error at or near "UNLOAD"
LINE 2:                     UNLOAD ('SELECT * FROM AdventureWorks.di...
                            ^
Traceback (most recent call last):**
  File "/home/erdc/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/erdc/.local/lib/python3.6/site-packages/airflow/operators/redshift_to_s3_operator.py", line 125, in execute
    postgres_hook.run(unload_query, self.autocommit)

Я использую это как мой DAG:

from airflow.operators.redshift_to_s3_operator import RedshiftToS3Transfer
from datetime import datetime, timedelta
from airflow.operators import DummyOperator
from airflow import DAG

default_args = {
    'owner': 'me',
    'start_date': datetime(2020,1,1),
    'retry_delay': timedelta(minutes=5)
}

# Using the context manager allows not to duplicate the dag parameter in each operation
with DAG('postgres_to_S3', default_args=default_args, schedule_interval='@once') as dag:


    start_task = DummyOperator(
        task_id='dummy_start'
    )


    unload_to_S3 = RedshiftToS3Transfer(
        task_id='postgres_to_S3',
        schema='AdventureWorks',
        table='dimgeography',
        s3_bucket='my_bucket',
        s3_key='S3_GEOGRAPHY',
        redshift_conn_id='pg_local',
        aws_conn_id='my_aws_conn'
#       ,dag=dag
    )

# Using arrows to set dependencies between tasks

start_task >> unload_to_S3

Я точно знаю, что соединения postgres и S3 работают.

Как вы думаете, этот обходной путь может сработать или мне нужно создать оператора самостоятельно?

Спасибо!

1 Ответ

1 голос
/ 04 мая 2020

Оператор (как видно из журналов) основан на выполнении запроса Unload. Postgres не имеет такой функциональности, поэтому не будет работать.

Вы можете взять код MySQLToS3Operator здесь https://github.com/apache/airflow/blob/9788d3195bedbeaf5e1fbb501c064dab8f5e7803/airflow/operators/mysql_to_s3_operator.py и внести некоторые изменения, чтобы изменить mysql для postgres

Кстати, если вы используете with Dag, вам не нужно явно передавать аргумент dag=dag любой задаче

...