Используйте переменную, созданную с помощью функции для задач воздушного потока - PullRequest
0 голосов
/ 26 сентября 2019

У меня есть переменная, которую я должен генерировать значение каждый раз, когда выполняю приведенную ниже функцию, и ее ответ используется для составления запроса, который мне нужно выполнить затем.В этом примере, которым я делюсь, я считаю, что это неправильный способ получения информации, которая генерируется перед выполнением задачи "bq_diff_id".У кого-нибудь есть предложения по получению этой переменной после результата предыдущего задания?

def get_data_from_bigquery():
    """query bigquery to get data to import to PSQL"""
    bq = bigquery.Client()
    #IDs
    query = """SELECT ID FROM dataset.table1"""
    query_job = bq.query(query)
    data = query_job.result()
    rows = list(data)
    diff1 = str(tuple(np.array(rows).T.tolist()))
    diff = diff1.replace("[", "").replace("]", "").replace(",)",")")
    #Count 
    count_query = """SELECT count(*) as qtt FROM dataset.table1"""
    count_query_job = bq.query(count_query)
    count_data = count_query_job.result()
    count_rows = list(count_data)
    count_end = str(count_rows[0][0])
    if int(count_end) <= 0:
        query = 'id is null'
        return query
    else:
        query = 'id in ' + diff
        return query

Python_1 = PythonOperator(task_id='bq_diff_id',
    python_callable=get_data_from_bigquery,
    dag=dag)

query = get_data_from_bigquery()

sql_query = """select id as id, \
value \
from table2 where """ + query + """ """ #Query for extract

MsSql = MsSqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mssql_conn_id=mssql_connection,
    google_cloud_storage_conn_id='gcp',
    sql=sql_query,
    bucket=nm_bucket,
    filename=nm_arquivo,
    schema_filename=sc_arquivo,
    dag=dag)
...