Я создал процесс в потоке воздуха, где мне нужно каждые 10 минут экспортировать новый файл из базы данных SQL Server и играть в BigQuery! Сгенерированный файл представляет собой CSV-файл, который автоматически содержит имя файла с датой обработки в формате ГГГГММДДЧЧММСС.
При переходе от шага 1 (экспорт) к шагу 2 (вставить в BigQuery) реле воздушного потока снова каждый сценарий меняет имя переменной имени файла, а дата обработки отличается от шага 1!
Пример:
Шаг 1: test_20190624113656.csv
Шаг 2: test_20190624113705.csv
В этом случае я хотел бы сохранить имя файла на первом этапе.
nm_arquivo = 'test_' + datetime.today().strftime('%Y%m%d%H%M%S') + '.csv'
def insert_bigquery(ds, **kwargs):
bigquery_client = bigquery.Client(project="project_name")
dataset_ref = bigquery_client.dataset('test_dataset')
job_config = bigquery.LoadJobConfig()
job_config.schema = [
bigquery.SchemaField('id','INTEGER',mode='REQUIRED'),
bigquery.SchemaField('sigla','STRING',mode='REQUIRED'),
bigquery.SchemaField('nome_en','STRING',mode='REQUIRED'),
bigquery.SchemaField('nome_pt','STRING',mode='REQUIRED'),
]
job_config.source_format = bigquery.SourceFormat.CSV
time_partitioning = bigquery.table.TimePartitioning()
job_config.time_partitioning = time_partitioning
job_config.clustering_fields = ["id", "sigla"]
uri = "gs://bucket_name/"+nm_arquivo
load_job = bigquery_client.load_table_from_uri(
uri,
dataset_ref.table('bdb'),
job_config=job_config
)
print('Starting job {}'.format(load_job.job_id))
load_job.result()
print('Job finished.')
#step1
import_orders_op = MsSqlToGoogleCloudStorageOperator(
task_id='import_orders',
mssql_conn_id='mssql_conn',
google_cloud_storage_conn_id='gcp_conn',
sql="""select * from bdb""",
bucket='bucket_name',
filename=nm_arquivo,
dag=dag)
#step2
run_this = PythonOperator(
task_id='insert_bigquery',
provide_context=True,
python_callable=insert_bigquery,
dag=dag,
)
run_this.set_upstream(import_orders_op)