Я хочу создать динамически созданную группу доступности базы данных из запроса к таблице базы данных.Когда я пытаюсь создать динамически создаваемую группу обеспечения доступности баз данных из диапазона точного числа или на основе доступного объекта в настройках воздушного потока, это удается.Однако, когда я пытаюсь использовать PostgresHook и создать группу обеспечения доступности баз данных для каждой строки в моей таблице, я вижу новую группу обеспечения доступности баз данных, генерируемую всякий раз, когда я добавляю новую строку в таблицу.Однако оказалось, что я не могу щелкнуть по вновь созданной группе DAG на интерфейсе веб-сервера airflow.Для большего контекста я использую Google Cloud Composer.Я уже выполнил действия, указанные в DAG-файлах, которые не активируются на веб-сервере Google Cloud Composer, но работают нормально на локальном Airflow .Однако он все еще не работает для моего случая.
Вот мой код
from datetime import datetime, timedelta
from airflow import DAG
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import NamedTupleCursor
import os
default_args = {
"owner": "debug",
"depends_on_past": False,
"start_date": datetime(2018, 10, 17),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
def create_dag(dag_id,
schedule,
default_args):
def hello_world_py(*args):
print 'Hello from DAG: {}'.format(dag_id)
dag = DAG(dag_id,
schedule_interval=timedelta(days=1),
default_args=default_args)
with dag:
t1 = PythonOperator(
task_id=dag_id,
python_callable=hello_world_py,
dag_id=dag_id)
return dag
dag = DAG("dynamic_yolo_pg_", default_args=default_args,
schedule_interval=timedelta(hours=1))
"""
Bahavior:
Create an exact DAG which in turn will create it's own file
https://www.astronomer.io/guides/dynamically-generating-dags/
"""
pg_hook = PostgresHook(postgres_conn_id='some_db')
conn = pg_hook.get_conn()
cursor = conn.cursor(cursor_factory=NamedTupleCursor)
cursor.execute("SELECT * FROM airflow_test_command;")
commands = cursor.fetchall()
for command in commands:
dag_id = command.id
schedule = timedelta(days=1)
id = "dynamic_yolo_" + str(dag_id)
print id
globals()[id] = create_dag(id,
schedule,
default_args)
Best,