У меня есть группа доступности базы данных, которая выполняет функцию, которая подключается к базе данных Postgres, удаляет содержимое таблицы и затем вставляет новый набор данных.
Я пытаюсь сделать это на своем локальном компьютере и вижу, что при попытке запустить веб-сервер требуется много времени для подключения, и в большинстве случаев это не удается.Однако, как часть процесса подключения, он, похоже, выполняет запросы из серверной части.Поскольку у меня есть функция удаления, я вижу, как данные удаляются из таблицы (в основном одна из функций выполняется), даже если я не запланировал сценарий или не запустил его вручную.Может ли кто-нибудь посоветовать, что я делаю не так в этом.
В пользовательском интерфейсе появляется одна ошибка:
Сломанный DAG: [/Users/user/airflow/dags/dwh_sample23.py] Тайм-аут
Также см. I рядом с идентификатором dag в пользовательском интерфейсе, который говорит, что T его DAG недоступен в объекте DAG веб-сервера. Ниже приведен код, который я использую:
## Third party Library Imports
import pandas as pd
import psycopg2
import airflow
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io
# Following are defaults which can be overridden later on
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime(2018, 5, 21),
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG('dwh_sample23', default_args=default_args)
#######################
## Login to DB
def db_login():
''' This function connects to the Data Warehouse and returns the cursor to execute queries '''
global dwh_connection
try:
dwh_connection = psycopg2.connect(" dbname = 'dbname' user = 'user' password = 'password' host = 'hostname' port = '5439' sslmode = 'require' ")
except:
print("I am unable to connect to the database.")
print('Success')
return(dwh_connection)
def tbl1_del():
''' This function takes clears all rows from tbl1 '''
cur = dwh_connection.cursor()
cur.execute("""DELETE FROM tbl1;""")
dwh_connection.commit()
def pop_tbl1():
''' This function populates all rows in tbl1 '''
cur = dwh_connection.cursor()
cur.execute(""" INSERT INTO tbl1
select id,name,price from tbl2;""")
dwh_connection.commit()
db_login()
tbl1_del()
pop_tbl1()
dwh_connection.close()
##########################################
t1 = BashOperator(
task_id='DB_Connect',
python_callable=db_login(),
bash_command='python3 ~/airflow/dags/dwh_sample23.py',
dag=dag)
t2 = BashOperator(
task_id='del',
python_callable=tbl1_del(),
bash_command='python3 ~/airflow/dags/dwh_sample23.py',
dag=dag)
t3 = BashOperator(
task_id='populate',
python_callable=pop_tbl1(),
bash_command='python3 ~/airflow/dags/dwh_sample23.py',
dag=dag)
t1.set_downstream(t2)
t2.set_downstream(t3)
Может ли кто-нибудь помочь?Спасибо.