Поток воздуха - Сломанный DAG - Время ожидания - PullRequest
0 голосов
/ 24 мая 2018

У меня есть группа доступности базы данных, которая выполняет функцию, которая подключается к базе данных Postgres, удаляет содержимое таблицы и затем вставляет новый набор данных.

Я пытаюсь сделать это на своем локальном компьютере и вижу, что при попытке запустить веб-сервер требуется много времени для подключения, и в большинстве случаев это не удается.Однако, как часть процесса подключения, он, похоже, выполняет запросы из серверной части.Поскольку у меня есть функция удаления, я вижу, как данные удаляются из таблицы (в основном одна из функций выполняется), даже если я не запланировал сценарий или не запустил его вручную.Может ли кто-нибудь посоветовать, что я делаю не так в этом.

В пользовательском интерфейсе появляется одна ошибка:

Сломанный DAG: [/Users/user/airflow/dags/dwh_sample23.py] Тайм-аут

Также см. I рядом с идентификатором dag в пользовательском интерфейсе, который говорит, что T его DAG недоступен в объекте DAG веб-сервера. Ниже приведен код, который я использую:

## Third party Library Imports
import pandas as pd
import psycopg2
import airflow
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io


# Following are defaults which can be overridden later on
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime(2018, 5, 21),
'retries': 1,
'retry_delay': timedelta(minutes=1),
}

dag = DAG('dwh_sample23', default_args=default_args)


#######################
## Login to DB

def db_login():
    ''' This function connects to the Data Warehouse and returns the cursor to execute queries '''
global dwh_connection
try:
    dwh_connection = psycopg2.connect(" dbname = 'dbname' user = 'user' password = 'password' host = 'hostname' port = '5439' sslmode = 'require' ")
except:
    print("I am unable to connect to the database.")
print('Success')
return(dwh_connection)

def tbl1_del():
''' This function takes clears all rows from tbl1 '''
cur = dwh_connection.cursor()
cur.execute("""DELETE FROM tbl1;""")
dwh_connection.commit()


def pop_tbl1():
''' This function populates all rows in tbl1 '''
cur = dwh_connection.cursor()
cur.execute(""" INSERT INTO tbl1
select id,name,price from tbl2;""")
dwh_connection.commit()



db_login()
tbl1_del()
pop_tbl1()
dwh_connection.close()

##########################################


t1 = BashOperator(
task_id='DB_Connect',
python_callable=db_login(),
bash_command='python3 ~/airflow/dags/dwh_sample23.py',
dag=dag)

t2 = BashOperator(
task_id='del',
python_callable=tbl1_del(),
bash_command='python3 ~/airflow/dags/dwh_sample23.py',
dag=dag)


t3 = BashOperator(
task_id='populate',
python_callable=pop_tbl1(),
bash_command='python3 ~/airflow/dags/dwh_sample23.py',
dag=dag)


t1.set_downstream(t2)
t2.set_downstream(t3)

Может ли кто-нибудь помочь?Спасибо.

1 Ответ

0 голосов
/ 24 мая 2018

Вместо использования BashOperator вы можете использовать PythonOperator и вызывать db_login(), tbl1_del(), pop_tbl1() в PythonOperator

## Third party Library Imports
import pandas as pd
import psycopg2
import airflow
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io


# Following are defaults which can be overridden later on
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime(2018, 5, 21),
'retries': 1,
'retry_delay': timedelta(minutes=1),
}

dag = DAG('dwh_sample23', default_args=default_args)


#######################
## Login to DB

def db_login():
    ''' This function connects to the Data Warehouse and returns the cursor to execute queries '''
global dwh_connection
try:
    dwh_connection = psycopg2.connect(" dbname = 'dbname' user = 'user' password = 'password' host = 'hostname' port = '5439' sslmode = 'require' ")
except:
    print("I am unable to connect to the database.")
print('Success')
return(dwh_connection)

def tbl1_del():
''' This function takes clears all rows from tbl1 '''
cur = dwh_connection.cursor()
cur.execute("""DELETE FROM tbl1;""")
dwh_connection.commit()


def pop_tbl1():
''' This function populates all rows in tbl1 '''
cur = dwh_connection.cursor()
cur.execute(""" INSERT INTO tbl1
select id,name,price from tbl2;""")
dwh_connection.commit()



db_login()
tbl1_del()
pop_tbl1()
dwh_connection.close()

##########################################


t1 = PythonOperator(
task_id='DB_Connect',
python_callable=db_login(),
dag=dag)

t2 = PythonOperator(
task_id='del',
python_callable=tbl1_del(),
dag=dag)


t3 = PythonOperator(
task_id='populate',
python_callable=pop_tbl1(),
dag=dag)


t1.set_downstream(t2)
t2.set_downstream(t3)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...