В последнее время я работаю над темой airflow и занимаюсь другой базой данных. поэтому я думаю, что я должен быть в состоянии поделиться некоторым опытом.
Полезная концепция воздушного потока:
DAG / Задачи: Вы можете просматривать и отслеживать на странице администрирования airflow-> dag.
переменная: установить и получить глобальный параметр среди различных знаков на уровне системы воздушного потока
Xcome: установить и получить параметр amongh различных задач определенного уровня дага.
Оператор Python: это могут быть экземпляры задач.
Оператор / модель БД: это могут быть экземпляры задач или объекты внутри функций Python.
- В моем случае я использую только оператор Python, а оператор, связанный с БД, был использован внутри функций, связанных с оператором Python.
3. в вашем случае вы можете сделать это в следующем псевдокоде:
from airflow import DAG
from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator as mysqltogcs
from datetime import timedelta
table = 'orders'
==============
def get_missing_ids(ds, **kwargs):
ti = kwargs['ti']
aws_id = get_aws_id(table)
bq_id = get_bq_id(table)
missing_id = [np.setdiff1d(aws_id,bq_id)]
missing_ids = ', '.join(map(str,missing_id))
ti.xcom_push(key='missing_ids', value=missing_ids)
===============
def get_orders_from_aws(ds, **kwargs):
missing_ids = ti.xcom_pull(key='missing_ids', task_ids='get_missing_ids')
sql = f"select * from orders where id in ({missing_ids})"
MG = mysqltogcs(sql=sql,
bucket = 'airflow_bucket',
filename = 'data/orders/db-orders{{ds}}{}',
mysql_conn_id = 'aws_readreplica',
approx_max_file_size_bytes = 100000000,
google_cloud_storage_conn_id = 'google_cloud_storage_default'
)
missing_data = MG.execute()
def print_done():
print("done boiiiii")
time.sleep(60)
==============
with DAG(dag_id="your_name", schedule_interval='timedelta(minute=5)') as dag:
task_1 = PythonOperator( task_id ="get_missing_ids",
python_callable=get_missing_ids,
provide_context=True)
task_2 = PythonOperator( task_id = 'get_orders_from_aws',
python_callable=get_orders_from_aws,
provide_context=True)
task_3 = PythonOperator( task_id='done',
python_callable=print_done)
task_1 >> task_3 >>task_3