У меня есть переменная, которую я должен генерировать значение каждый раз, когда выполняю приведенную ниже функцию, и ее ответ используется для составления запроса, который мне нужно выполнить затем.В этом примере, которым я делюсь, я считаю, что это неправильный способ получения информации, которая генерируется перед выполнением задачи "bq_diff_id".У кого-нибудь есть предложения по получению этой переменной после результата предыдущего задания?
def get_data_from_bigquery():
"""query bigquery to get data to import to PSQL"""
bq = bigquery.Client()
#IDs
query = """SELECT ID FROM dataset.table1"""
query_job = bq.query(query)
data = query_job.result()
rows = list(data)
diff1 = str(tuple(np.array(rows).T.tolist()))
diff = diff1.replace("[", "").replace("]", "").replace(",)",")")
#Count
count_query = """SELECT count(*) as qtt FROM dataset.table1"""
count_query_job = bq.query(count_query)
count_data = count_query_job.result()
count_rows = list(count_data)
count_end = str(count_rows[0][0])
if int(count_end) <= 0:
query = 'id is null'
return query
else:
query = 'id in ' + diff
return query
Python_1 = PythonOperator(task_id='bq_diff_id',
python_callable=get_data_from_bigquery,
dag=dag)
query = get_data_from_bigquery()
sql_query = """select id as id, \
value \
from table2 where """ + query + """ """ #Query for extract
MsSql = MsSqlToGoogleCloudStorageOperator(
task_id='import_orders',
mssql_conn_id=mssql_connection,
google_cloud_storage_conn_id='gcp',
sql=sql_query,
bucket=nm_bucket,
filename=nm_arquivo,
schema_filename=sc_arquivo,
dag=dag)