Динамическое создание группы доступности базы данных на основе строки, доступной для подключения к базе данных - PullRequest
0 голосов
/ 19 октября 2018

Я хочу создать динамически созданную группу доступности базы данных из запроса к таблице базы данных.Когда я пытаюсь создать динамически создаваемую группу обеспечения доступности баз данных из диапазона точного числа или на основе доступного объекта в настройках воздушного потока, это удается.Однако, когда я пытаюсь использовать PostgresHook и создать группу обеспечения доступности баз данных для каждой строки в моей таблице, я вижу новую группу обеспечения доступности баз данных, генерируемую всякий раз, когда я добавляю новую строку в таблицу.Однако оказалось, что я не могу щелкнуть по вновь созданной группе DAG на интерфейсе веб-сервера airflow.Для большего контекста я использую Google Cloud Composer.Я уже выполнил действия, указанные в DAG-файлах, которые не активируются на веб-сервере Google Cloud Composer, но работают нормально на локальном Airflow .Однако он все еще не работает для моего случая.

Вот мой код

from datetime import datetime, timedelta

from airflow import DAG
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import NamedTupleCursor
import os

default_args = {
  "owner": "debug",
  "depends_on_past": False,
  "start_date": datetime(2018, 10, 17),
  "email": ["airflow@airflow.com"],
  "email_on_failure": False,
  "email_on_retry": False,
  "retries": 1,
  "retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}


def create_dag(dag_id,
           schedule,
           default_args):
def hello_world_py(*args):
    print 'Hello from DAG: {}'.format(dag_id)

dag = DAG(dag_id,
          schedule_interval=timedelta(days=1),
          default_args=default_args)

with dag:
    t1 = PythonOperator(
        task_id=dag_id,
        python_callable=hello_world_py,
        dag_id=dag_id)

return dag


dag = DAG("dynamic_yolo_pg_", default_args=default_args,     
        schedule_interval=timedelta(hours=1))

"""
Bahavior:
Create an exact DAG which in turn will create it's own file
https://www.astronomer.io/guides/dynamically-generating-dags/
"""
pg_hook = PostgresHook(postgres_conn_id='some_db')
conn = pg_hook.get_conn()
cursor = conn.cursor(cursor_factory=NamedTupleCursor)
cursor.execute("SELECT * FROM airflow_test_command;")
commands = cursor.fetchall()
for command in commands:
  dag_id = command.id
  schedule = timedelta(days=1)

  id = "dynamic_yolo_" + str(dag_id)

  print id

  globals()[id] = create_dag(id,
                           schedule,
                           default_args)

Best,

1 Ответ

0 голосов
/ 08 января 2019

Эту проблему можно решить с помощью автономного веб-сервера Airflow, используя шаги, упомянутые в [1].После того, как вы это сделаете, если вы решите добавить аутентификацию перед вашим самоуправляемым веб-сервером, после создания входа ваши BackendServices должны появиться в консоли Google IAP, и вы сможете включить IAP.Если вы хотите получить доступ к воздушному потоку программным способом, вы также можете использовать аутентификацию JWT с использованием служебной учетной записи для вашего управляемого веб-сервера Airflow [2].

[1] https://cloud.google.com/composer/docs/how-to/managing/deploy-webserver

[2]https://cloud.google.com/iap/docs/authentication-howto

...