Воздушный поток - скрипт выполняется во время инициализации - PullRequest
0 голосов
/ 25 мая 2018

У меня есть скрипт Airflow, который, как ожидается, вставит данные из таблицы_2 в таблицу_1.Как часть процесса инициализации воздушного потока, я вижу, что функция вставки продолжает работать в фоновом режиме, даже если я не запустил ее или не запланировал.Мне интересно, что не так в скрипте, который заставляет его срабатывать автоматически.Что мне нужно изменить в приведенном ниже сценарии, чтобы он не выполнял команду как часть процесса инициализации.

## Library Imports
import psycopg2
import airflow
from airflow import DAG
from airflow.operators import BashOperator
from sqlalchemy import create_engine
import io


# Following are defaults which can be overridden later on
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime(2018, 5, 25),
'email': ['admin@mail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}

dag = DAG('sample', default_args=default_args)


#######################

def db_login():
    global db_con
try:
    db_con = psycopg2.connect(" dbname = 'db' user = 'user' password = 'pass' host = 'hostname' port = '5439' sslmode = 'require' ")
except:
    print("I am unable to connect to the database.")
print('Connection Task Complete: Connected to DB')
return(db_con)


#######################

def insert_data():
    cur = db_con.cursor()
    cur.execute("""insert into table_1 select id,name,status from table_2 limit 2 ;""")
    db_con.commit()
    print('ETL Task Complete: Inserting data into table_1')


db_login()
insert_data()
db_con.close()

##########################################


t1 = BashOperator(
task_id='db_con',
python_callable=db_login(),
bash_command='python3 ~/airflow/dags/sample.py',
email_on_failure=True,
email=['admin@mail.com'],
dag=dag)

t2 = BashOperator(
task_id='insert',
python_callable=insert_data(),
bash_command='python3 ~/airflow/dags/sample.py',
email_on_failure=True,
email=['admin@mail.com'],
dag=dag)


t1.set_downstream(t2)

Может ли кто-нибудь помочь в этом.Спасибо.

Обновлен код:

## Third party Library Imports

import psycopg2
import airflow
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io



default_args = {
'owner': 'admin',
#'depends_on_past': False,
'start_date': datetime(2018, 5, 25),
 'email': ['admin@mail.com'],
 'email_on_failure': True,
 'email_on_retry': True,
 'retries': 1,
 'retry_delay': timedelta(minutes=1), }

dag = DAG('sample', default_args=default_args, catchup=False, schedule_interval="@once")


def db_login():
    global db_con
    try:
        db_con = psycopg2.connect(
        " dbname = 'db' user = 'user' password = 'password' host = 'host' port = '5439' sslmode = 'require' ")
    except:
        print("I am unable to connect to the database.")
    print('Connection success')
    return (db_con)

def insert_data():
    cur = db_con.cursor()
    cur.execute("""insert into table_1 select id,name,status from table_2 limit 2;""")
    db_con.commit()
    print('ETL Task Complete: Inserting data into table_1')

def load_etl():
    db_login()
    insert_data()
    dwh_connection.close()

#Function to execute the query
load_etl()

t1 = BashOperator(
    task_id='db_connection',
    python_callable=load_etl(),
    bash_command='python3 ~/airflow/dags/sample.py',
    email_on_failure=True,
    email=['admin@mail.com'],
    dag=dag)

#t2 = BashOperator(
#task_id='ops_load_del',
#python_callable=insert_data(),
#bash_command='python3 ~/airflow/dags/sample.py',
#email_on_failure=True,
#email=['admin@mail.com'],
#dag=dag)

t1
#t1.set_downstream(t2)

1 Ответ

0 голосов
/ 25 мая 2018

Если вы посмотрите на свой DAG с точки зрения Python, отступы вызывают несколько мыслей.

Сначала попробуйте выполнить DAG только с python name-of-dag.py.Да, не используйте команду airflow.Это также делают некоторые части Airflow, чтобы проверить, что делать.

Теперь, если какой-то код выполняется, это может быть связано с намерением.

Анализfunction

Здесь отступ кажется неправильным:

def db_login (): global db_con try: db_con = psycopg2.connect ("dbname = 'db' user = 'user' password ='pass' host = 'hostname' port = '5439' sslmode = 'require' ") за исключением: print (" Я не могу подключиться к базе данных. ") print ('Подключение выполнено: соединение с БД') return (db_con)

Это должно быть:

def db_login():
    global db_con
    try:
        db_con = psycopg2.connect(" dbname = 'db' user = 'user' password = 'pass' host = 'hostname' port = '5439' sslmode = 'require' ")
    except:
        print("I am unable to connect to the database.")
    print('Connection Task Complete: Connected to DB')
    return(db_con)

В противном случае всегда будет выполняться код слева.

Также: global переменные будутне должно быть доступно в других методах в Airflow!Для обмена подключениями используйте, например, Airflows XCOM: https://airflow.apache.org/concepts.html#xcoms

Вызовы функций непосредственно в DAG

Кроме того, по какой-то причине, незаметно для меня, вы хотитевыполнять некоторые функции полностью вне контроля Airflow , но при каждом выполнении.

db_login() 
insert_data()
db_con.close()

Этот код будет выполняться каждый раз, когда DAG вызывается (что можеточень много) и может полностью отличаться от желаемого расписания.

Если вы хотите, чтобы этот код присутствовал в целях тестирования, вы можете поместить его в основной вызов:

if __name__ == '__main__':
    db_login() 
    insert_data()
    db_con.close()

Даже если вы это сделаете - операция закрытия доступна только в этом рабочем процессе, но не в группе обеспечения доступности баз данных.Нет задачи закрыть соединение.

Поскольку вы используете PythonOperator, возможно, было бы разумно создать небольшую def для этого и иметь только одну задачу, которая вызывает эту def:

def load_etl():
    db_login() 
    insert_data()
    db_con.close()

TL / DR: Устранить все ошибки отступов, чтобы при вызове файла исключительно с помощью Python код не выполнялся.

EDIT

Это также означает, что никакие вызовы функций не находятся вне задачи или определения.Эта строка

#Function to execute the query
load_etl()

будет выполнена, поскольку она не является частью задачи или определения.Это должно быть удалено.Тогда он должен работать, поскольку вызов функции является частью задачи.

Поскольку эта функция является функцией Python, вы должны использовать PythonOperator и ее параметр python_callable=load_etl (примечание: в конце нет скобок)линии)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...