Если вы посмотрите на свой DAG с точки зрения Python, отступы вызывают несколько мыслей.
Сначала попробуйте выполнить DAG только с python name-of-dag.py
.Да, не используйте команду airflow
.Это также делают некоторые части Airflow, чтобы проверить, что делать.
Теперь, если какой-то код выполняется, это может быть связано с намерением.
Анализfunction
Здесь отступ кажется неправильным:
def db_login (): global db_con try: db_con = psycopg2.connect ("dbname = 'db' user = 'user' password ='pass' host = 'hostname' port = '5439' sslmode = 'require' ") за исключением: print (" Я не могу подключиться к базе данных. ") print ('Подключение выполнено: соединение с БД') return (db_con)
Это должно быть:
def db_login():
global db_con
try:
db_con = psycopg2.connect(" dbname = 'db' user = 'user' password = 'pass' host = 'hostname' port = '5439' sslmode = 'require' ")
except:
print("I am unable to connect to the database.")
print('Connection Task Complete: Connected to DB')
return(db_con)
В противном случае всегда будет выполняться код слева.
Также: global переменные будутне должно быть доступно в других методах в Airflow!Для обмена подключениями используйте, например, Airflows XCOM
: https://airflow.apache.org/concepts.html#xcoms
Вызовы функций непосредственно в DAG
Кроме того, по какой-то причине, незаметно для меня, вы хотитевыполнять некоторые функции полностью вне контроля Airflow , но при каждом выполнении.
db_login()
insert_data()
db_con.close()
Этот код будет выполняться каждый раз, когда DAG вызывается (что можеточень много) и может полностью отличаться от желаемого расписания.
Если вы хотите, чтобы этот код присутствовал в целях тестирования, вы можете поместить его в основной вызов:
if __name__ == '__main__':
db_login()
insert_data()
db_con.close()
Даже если вы это сделаете - операция закрытия доступна только в этом рабочем процессе, но не в группе обеспечения доступности баз данных.Нет задачи закрыть соединение.
Поскольку вы используете PythonOperator
, возможно, было бы разумно создать небольшую def для этого и иметь только одну задачу, которая вызывает эту def:
def load_etl():
db_login()
insert_data()
db_con.close()
TL / DR: Устранить все ошибки отступов, чтобы при вызове файла исключительно с помощью Python код не выполнялся.
EDIT
Это также означает, что никакие вызовы функций не находятся вне задачи или определения.Эта строка
#Function to execute the query
load_etl()
будет выполнена, поскольку она не является частью задачи или определения.Это должно быть удалено.Тогда он должен работать, поскольку вызов функции является частью задачи.
Поскольку эта функция является функцией Python, вы должны использовать PythonOperator
и ее параметр python_callable=load_etl
(примечание: в конце нет скобок)линии)