Google Cloud Composer и Google Cloud SQL - PullRequest
0 голосов
/ 03 мая 2018

Какие у нас есть способы подключения к экземпляру Google Cloud SQL (MySQL) из недавно представленного Google Cloud Composer? Намерение состоит в том, чтобы получить данные из экземпляра Cloud SQL в BigQuery (возможно, с промежуточным этапом через Cloud Storage).

  1. Может ли облачный SQL-прокси каким-либо образом быть представлен на модулях в кластере Kubernetes, где размещается Composer?

  2. Если нет, можно ли подключить прокси-сервер Cloud SQL с помощью Kubernetes Service Broker? -> https://cloud.google.com/kubernetes-engine/docs/concepts/add-on/service-broker

  3. Следует ли использовать Airflow для планирования и вызова команд API GCP, таких как 1) экспорт таблицы mysql в облачное хранилище 2) чтение экспорта mysql в bigquery?

  4. Возможно, есть другие методы, которые мне не хватает, чтобы сделать это

Ответы [ 5 ]

0 голосов
/ 19 июля 2019

Добавление среднего поста в комментариях от @Leo на верхний уровень https://medium.com/@ariklevliber/connecting-to-gcp-composer-tasks-to-cloud-sql-7566350c5f53. После того, как вы прочитаете эту статью и получите настройку службы, вы можете подключиться из группы DAG с помощью SQLAlchemy, например:

import os
from datetime import datetime, timedelta
import logging

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

logger = logging.getLogger(os.path.basename(__file__))
INSTANCE_CONNECTION_NAME = "phil-new:us-east1:phil-db"

default_args = {
    'start_date': datetime(2019, 7, 16)
}


def connect_to_cloud_sql():
    '''
        Create a connection to CloudSQL
    :return:
    '''
    import sqlalchemy
    try:
        PROXY_DB_URL = "mysql+pymysql://<user>:<password>@<cluster_ip>:3306/<dbname>"
        logger.info("DB URL", PROXY_DB_URL)
        engine = sqlalchemy.create_engine(PROXY_DB_URL, echo=True)
        for result in engine.execute("SELECT NOW() as now"):
            logger.info(dict(result))
    except Exception:
        logger.exception("Unable to interact with CloudSQL")


dag = DAG(
    dag_id="example_sqlalchemy",
    default_args=default_args,
    # schedule_interval=timedelta(minutes=5),
    catchup=False  # If you don't set this then the dag will run according to start date
)


t1 = PythonOperator(
    task_id="example_sqlalchemy",
    python_callable=connect_to_cloud_sql,
    dag=dag
)


if __name__ == "__main__":
    connect_to_cloud_sql()
0 голосов
/ 21 мая 2019

«Облачный прокси-сервер SQL обеспечивает безопасный доступ к вашим экземплярам Cloud SQL второго поколения без необходимости внесения в белый список IP-адресов или настройки SSL». - Документы Google CloudSQL-Proxy

CloudSQL Proxy, по-видимому, является рекомендуемым способом подключения к CloudSQL выше всех остальных. Таким образом, в Composer, начиная с версии 1.6.1, мы можем создать новый модуль Kubernetes Pod для запуска образа gcr.io/cloudsql-docker/gce-proxy:latest, предоставить его через службу, а затем создать соединение в Composer для использования в операторе.

Для настройки:

  • Следуйте Документация Google

  • Проверьте соединение, используя информацию от Средняя почта Арика

    • Убедитесь, что пакет был создан kubectl get pods --all-namespaces

    • Убедитесь, что служба была создана kubectl get services --all-namespaces

    • Перейти в рабочий узел kubectl --namespace=composer-1-6-1-airflow-1-10-1-<some-uid> exec -it airflow-worker-<some-uid> bash

      • Проверка соединения mysql mysql -u composer -p --host <service-name>.default.svc.cluster.local

Примечания:

0 голосов
/ 18 июня 2018

Здесь, в ответе Хоффа на похожий вопрос, вы можете найти ссылку на то, как Wepay синхронизирует его каждые 15 минут с помощью оператора воздушного потока.

Из указанного ответа:

Посмотрите, как это делает WePay:

Оператор MySQL to GCS выполняет запрос SELECT для MySQL Таблица. SELECT извлекает все данные, которые больше (или равны) последним высокий водяной знак. Верхний водяной знак является либо первичным ключом таблица (если таблица только для добавления) или метка времени изменения столбец (если таблица получает обновления). Опять же, оператор SELECT также возвращается назад во времени (или строки), чтобы поймать потенциально упал строки из последнего запроса (из-за проблем, упомянутых выше).

С помощью Airflow им удается синхронизировать BigQuery с MySQL. база данных каждые 15 минут.

0 голосов
/ 06 февраля 2019

У нас была такая же проблема, но с экземпляром Postgres. Вот что мы сделали и заставили его работать:

  • создать развертывание sqlproxy в кластере Kubernetes, где проходит поток воздуха. Это была копия существующего airflow-sqlproxy, используемого соединением airflow_db по умолчанию со следующими изменениями в файле развертывания:

    • заменить все экземпляры airflow-sqlproxy новым именем прокси
    • отредактируйте под 'spec: template: spec: container: command: -instances', замените имя существующего экземпляра новым экземпляром, к которому мы хотим подключиться
  • создать службу kubernetes, снова как копию существующей службы airflow-sqlproxy со следующими изменениями:

    • заменить все экземпляры airflow-sqlproxy новым именем прокси
    • в разделе «spec: ports» измените на соответствующий порт (мы использовали 5432 для экземпляра Postgres)
  • в интерфейсе воздушного потока добавьте соединение типа Postgres с хостом, установленным для вновь созданного имени службы.

0 голосов
/ 18 мая 2018

Вы можете следовать этим инструкциям , чтобы запустить новый экземпляр прокси-сервера Cloud SQL в кластере.

re # 3: Звучит как хороший план. Насколько мне известно, не существует оператора Cloud SQL для BigQuery, поэтому вам придется сделать это в два этапа, как вы описали.

...