У меня есть задание воздушного потока, которое подключается к Postgres db и обновляет некоторые значения.
Ниже приведен код, и он отлично работает.
Однако, тестируя весь мой дагс, я много раз обновлял записи в производстве.
(Так как мне приходилось запускать несколько раз, чтобы добраться до финальной версии кода)
Сейчас я собираюсь написать похожую ошибку, но не хочу обновлять таблицы в процессе тестирования.
Есть ли прямой способ запустить даг только для тестирования.
Например, поставить параметр, такой как dry_run = true или что-то подобное.
Код Дага:
import airflow
from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
import os
from datetime import datetime, timedelta
import numpy as np
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": airflow.utils.dates.days_ago(1),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1),
}
dag = DAG("Score_update", default_args=default_args, schedule_interval='3,33 * * * *',template_searchpath = ['/root/airflow/sql/'],max_active_runs=1)
updateOp = PostgresOperator(
task_id='Refresh_DailyScore',
postgres_conn_id='postgress_sophi',
sql='Score.sql',
params={'name':'alpa'},
dag=dag)
updateOp