Airflow: передать параметр из функции python в MySQL Operator - PullRequest
0 голосов
/ 07 мая 2019

Что я хочу сделать:
- сравнить отсутствующий идентификатор из MySQL и BigQuery
- получить все данные из MySQL, где идентификатор отсутствующего идентификатора

table = 'orders'

def get_missing_ids():
    aws_id = get_aws_id(table)
    bq_id = get_bq_id(table)
    missing_id = [np.setdiff1d(aws_id,bq_id)]
    missing_ids = ', '.join(map(str,missing_id))
    return missing_ids

missing_ids = get_missing_ids()

get_missing_data = MysqlToGCS(
    task_id = 'get_orders_from_aws',
    sql = """select *
        from orders 
        where id in ({{params.missing_ids}})""",
    params = {'missing_ids':missing_ids},
    bucket = 'airflow_bucket',
    filename = 'data/orders/db-orders{{ds}}{}',
    mysql_conn_id = 'aws_readreplica',
    approx_max_file_size_bytes = 100000000,
    google_cloud_storage_conn_id = 'google_cloud_storage_default',
    dag=dag)

def print_done():
    print("done boiiiii")
    time.sleep(60)

task = PythonOperator(
        task_id='done',
        python_callable=print_done,
        dag=dag)

task.set_upstream(get_missing_data)

Я читал о Xcom, но яне понимаю, как это реализовать здесь.

Ответы [ 2 ]

0 голосов
/ 07 мая 2019

В последнее время я работаю над темой airflow и занимаюсь другой базой данных. поэтому я думаю, что я должен быть в состоянии поделиться некоторым опытом.

  1. Полезная концепция воздушного потока: DAG / Задачи: Вы можете просматривать и отслеживать на странице администрирования airflow-> dag.

    переменная: установить и получить глобальный параметр среди различных знаков на уровне системы воздушного потока Xcome: установить и получить параметр amongh различных задач определенного уровня дага. Оператор Python: это могут быть экземпляры задач. Оператор / модель БД: это могут быть экземпляры задач или объекты внутри функций Python.

  2. В моем случае я использую только оператор Python, а оператор, связанный с БД, был использован внутри функций, связанных с оператором Python.

3. в вашем случае вы можете сделать это в следующем псевдокоде:

from airflow import DAG

from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator  as mysqltogcs

from datetime import timedelta

table = 'orders'

==============

def get_missing_ids(ds, **kwargs):
    ti = kwargs['ti']
    aws_id = get_aws_id(table)
    bq_id = get_bq_id(table)
    missing_id = [np.setdiff1d(aws_id,bq_id)]
    missing_ids = ', '.join(map(str,missing_id))
    ti.xcom_push(key='missing_ids', value=missing_ids)

===============

def get_orders_from_aws(ds, **kwargs):
    missing_ids = ti.xcom_pull(key='missing_ids', task_ids='get_missing_ids')
    sql = f"select *  from orders where id in ({missing_ids})"
    MG = mysqltogcs(sql=sql,
                    bucket = 'airflow_bucket',
                    filename = 'data/orders/db-orders{{ds}}{}',
                    mysql_conn_id = 'aws_readreplica',
                    approx_max_file_size_bytes = 100000000,
                    google_cloud_storage_conn_id = 'google_cloud_storage_default'
                   )
    missing_data = MG.execute()




def print_done():
    print("done boiiiii")
    time.sleep(60)

==============

with DAG(dag_id="your_name", schedule_interval='timedelta(minute=5)') as dag:

        task_1 = PythonOperator( task_id ="get_missing_ids",
                        python_callable=get_missing_ids,
                        provide_context=True)

        task_2 = PythonOperator( task_id = 'get_orders_from_aws',
                        python_callable=get_orders_from_aws,
                        provide_context=True)

        task_3 = PythonOperator( task_id='done',
                        python_callable=print_done)

        task_1 >> task_3 >>task_3
0 голосов
/ 07 мая 2019

Я думаю, что самый простой способ - создать функцию create_sql_query, которая может выглядеть следующим образом:

def create_sql_query():
    missing_ids = get_missing_ids()
    return f"select * from orders where id in ({missing_ids})"

Тогда вы можете изменить get_missing_data на:

get_missing_data = MysqlToGCS(
    task_id = 'get_orders_from_aws',
    sql = create_sql_query(),
    ...

На самом деле вы даже можете написать так:

get_missing_data = MysqlToGCS(
    task_id = 'get_orders_from_aws',
    sql = f"select * from orders where id in ({get_missing_ids()})",
    ...

но это выглядит не так, как у меня.

...