Я новичок в облачном композиторе и хочу выполнить один запрос PostgreSQL SELECT, используя хук gcp_cloud_sql в потоке облачного композитора. Я пытался с CloudSqlQueryOperator, но он не работает с запросами SELECT.
Я хочу создать группы обеспечения доступности баз данных на основе результатов, полученных из этого запроса select. Однако я не могу создать даже простое соединение для этого запроса SELECT.
from six.moves.urllib.parse import quote_plus
import airflow
from airflow import models
from airflow.contrib.operators.gcp_sql_operator import (
CloudSqlQueryOperator
)
from datetime import date, datetime, timedelta
GCP_PROJECT_ID = "adtech-dev"
GCP_REGION = "<my cluster zone>"
GCSQL_POSTGRES_INSTANCE_NAME_QUERY = "testpostgres"
GCSQL_POSTGRES_DATABASE_NAME = ""
GCSQL_POSTGRES_USER = "<PostgreSQL User Name>"
GCSQL_POSTGRES_PASSWORD = "**********"
GCSQL_POSTGRES_PUBLIC_IP = "0.0.0.0"
GCSQL_POSTGRES_PUBLIC_PORT = "5432"
rule_query = "select r.id from rules r where r.id = 1"
postgres_kwargs = dict(
user=quote_plus(GCSQL_POSTGRES_USER),
password=quote_plus(GCSQL_POSTGRES_PASSWORD),
public_port=GCSQL_POSTGRES_PUBLIC_PORT,
public_ip=quote_plus(GCSQL_POSTGRES_PUBLIC_IP),
project_id=quote_plus(GCP_PROJECT_ID),
location=quote_plus(GCP_REGION),
instance=quote_plus(GCSQL_POSTGRES_INSTANCE_NAME_QUERY),
database=quote_plus(GCSQL_POSTGRES_DATABASE_NAME)
)
default_args = {
'owner': 'airflow',
'start_date': datetime(2018, 5, 31),
'email': ['aniruddha.dwivedi@xyz.com'],
'email_on_failure': True,
'email_on_retry': False,
'depends_on_past': False,
'catchup': False,
'retries': 3,
'retry_delay': timedelta(minutes=10),
}
os.environ['AIRFLOW_CONN_PROXY_POSTGRES_TCP'] = \
"gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
"database_type=postgres&" \
"project_id={project_id}&" \
"location={location}&" \
"instance={instance}&" \
"use_proxy=True&" \
"sql_proxy_use_tcp=True".format(**postgres_kwargs)
connection_names = [
"proxy_postgres_tcp"
]
tasks = []
with models.DAG(
dag_id='example_gcp_sql_query',
default_args=default_args,
schedule_interval=None
) as dag:
prev_task = None
for connection_name in connection_names:
task = CloudSqlQueryOperator(
gcp_cloudsql_conn_id=connection_name,
task_id="example_gcp_sql_task_" + connection_name,
sql=rule_query
)
tasks.append(task)
if prev_task:
prev_task >> task
prev_task = task