Я пытаюсь создать DAG в Airflow. Поскольку это новый проект, я создал новую папку со всеми DAG, относящимися к продукту. Я получаю, что сломанный объект DAG 'module' не вызывается. Ни идентификатор DAG, ни какая-либо функция не названы в качестве файла, поэтому я не понимаю ошибку 'module'. Должен ли я что-то делать с новой папкой, которую я создал
import pandas as pd
import psycopg2
import datetime
import io
from airflow import DAG
from airflow.models import Variable
from airflow.hooks.base_hook import BaseHook
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
# ----------------------------------------------------------------------------------------------------------------------
# Connections
# ----------------------------------------------------------------------------------------------------------------------
def db1_conn():
# Core Credentials
db1_hook = BaseHook.get_connection('db1')
db1_conn = psycopg2.connect(dbname=db1_hook.schema, user=db1_hook.login,
password=db1_hook.password, host=db1_hook.host,
port=db1_hook.port)
return db1_conn
def db2_write():
# Core Write
db2_hook = BaseHook.get_connection('db2-write')
db2_conn = psycopg2.connect(dbname=db2_hook.schema, user=db2_hook.login,
password=db2_hook.password, host=db2_hook.host,
port=db2_hook.port)
return db2_conn
# ----------------------------------------------------------------------------------------------------------------------
# Python Callable
# ----------------------------------------------------------------------------------------------------------------------
def run_this():
query = f"""
SELECT DISTINCT ON (userid) 0 AS id,
'{month}'::timestamp AS month,
userid,
balance,
transactiondate AS last_transaction_date,
transactionexternaluniqueid AS last_transaction_unique_id
FROM public.product_view
WHERE transactiondate < '{month}'
ORDER BY userid, transactiondate DESC
"""
data = pd.read_sql(query, db1_conn())
# Index.
index_query = """
SELECT COALESCE(MAX(id), 0)::INT AS id
FROM schema.product_snapshot
"""
index_start = pd.read_sql(index_query, db1_conn)
data['id'] = data.index + index_start + 1
# Write on table.
buf = io.StringIO()
stoplightdata_buffer.write(data.to_csv(index=None, header=None))
buf.seek(0)
conn = core_write()
cur = conn.cursor()
cur.copy_from(buf, table='schema.product_snapshot', sep=',', null='')
conn.commit()
# ----------------------------------------------------------------------------------------------------------------------
# DAG
# ----------------------------------------------------------------------------------------------------------------------
DAG_NAME = 'product_snapshot_dag'
args = {
'owner': 'airflow',
'start_date': datetime(2020, 3, 22),
'email': [ 'email@email.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=2),
'sla': timedelta(minutes=30),
'execution_timeout': timedelta(minutes=50)
}
dag = DAG(DAG_NAME,
schedule_interval='1 * 1 * *',
default_args=args)
# ----------------------------------------------------------------------------------------------------------------------
# Postgres Operator
# ----------------------------------------------------------------------------------------------------------------------
product_table_creation = PostgresOperator(task_id='product_table_creation',
postgres_conn_id='connection',
sql='sql/product_table_creation.sql',
dag=dag)
# ----------------------------------------------------------------------------------------------------------------------
# Python Operator
# ----------------------------------------------------------------------------------------------------------------------
product_snapshot_data = PythonOperator(task_id='product_snapshot_data',
provide_context=True,
python_callable=run_this,
dag=dag)
product_table_creation >> product_snapshot_data
if __name__ == "__main__":
dag.cli()
Новая папка DAG находится среди других папок DAG.