Поскольку не существует способов загрузки данных из Postgres в S3, я пытаюсь использовать для этого оператор Redshift, но я получаю следующую ошибку:
[2020-05-03 18:53:07,359] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: postgres_to_S3.postgres_to_S3 2020-01-01T00:00:00+00:00 [queued]>
[2020-05-03 18:53:07,368] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: postgres_to_S3.postgres_to_S3 2020-01-01T00:00:00+00:00 [queued]>
[2020-05-03 18:53:07,368] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-05-03 18:53:07,368] {taskinstance.py:880} INFO - Starting attempt 1 of 1
[2020-05-03 18:53:07,368] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-05-03 18:53:07,385] {taskinstance.py:900} INFO - Executing <Task(RedshiftToS3Transfer): postgres_to_S3> on 2020-01-01T00:00:00+00:00
[2020-05-03 18:53:07,389] {standard_task_runner.py:53} INFO - Started process 124750 to run task
[2020-05-03 18:53:07,445] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: postgres_to_S3.postgres_to_S3 2020-01-01T00:00:00+00:00 [running]> erdc-Virtual-Machine
[2020-05-03 18:53:07,487] {redshift_to_s3_operator.py:124} INFO - Executing UNLOAD command...
[2020-05-03 18:53:07,491] {logging_mixin.py:112} INFO - [2020-05-03 18:53:07,490] {base_hook.py:87} INFO - Using connection to: id: pg_local. Host: 192.168.0.199, Port: 5432, Schema: AdventureWorks, Login: myuser, Password: mypass, extra: None
[2020-05-03 18:53:07,498] {logging_mixin.py:112} INFO - [2020-05-03 18:53:07,498] {dbapi_hook.py:174} INFO -
UNLOAD ('SELECT * FROM AdventureWorks.dimgeography')
TO 's3://mybucket/S3_GEOGRAPHY/dimgeography_'
with credentials
'aws_access_key_id=mykey;aws_secret_access_key=mysecretkey'
;
**[2020-05-03 18:53:07,499] {taskinstance.py:1145} ERROR - syntax error at or near "UNLOAD"
LINE 2: UNLOAD ('SELECT * FROM AdventureWorks.di...
^
Traceback (most recent call last):**
File "/home/erdc/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/erdc/.local/lib/python3.6/site-packages/airflow/operators/redshift_to_s3_operator.py", line 125, in execute
postgres_hook.run(unload_query, self.autocommit)
Я использую это как мой DAG:
from airflow.operators.redshift_to_s3_operator import RedshiftToS3Transfer
from datetime import datetime, timedelta
from airflow.operators import DummyOperator
from airflow import DAG
default_args = {
'owner': 'me',
'start_date': datetime(2020,1,1),
'retry_delay': timedelta(minutes=5)
}
# Using the context manager allows not to duplicate the dag parameter in each operation
with DAG('postgres_to_S3', default_args=default_args, schedule_interval='@once') as dag:
start_task = DummyOperator(
task_id='dummy_start'
)
unload_to_S3 = RedshiftToS3Transfer(
task_id='postgres_to_S3',
schema='AdventureWorks',
table='dimgeography',
s3_bucket='my_bucket',
s3_key='S3_GEOGRAPHY',
redshift_conn_id='pg_local',
aws_conn_id='my_aws_conn'
# ,dag=dag
)
# Using arrows to set dependencies between tasks
start_task >> unload_to_S3
Я точно знаю, что соединения postgres и S3 работают.
Как вы думаете, этот обходной путь может сработать или мне нужно создать оператора самостоятельно?
Спасибо!