Airflow - скрипт изменяет имя файла переменной - PullRequest
0 голосов
/ 24 июня 2019

Я создал процесс в потоке воздуха, где мне нужно каждые 10 минут экспортировать новый файл из базы данных SQL Server и играть в BigQuery! Сгенерированный файл представляет собой CSV-файл, который автоматически содержит имя файла с датой обработки в формате ГГГГММДДЧЧММСС.

При переходе от шага 1 (экспорт) к шагу 2 (вставить в BigQuery) реле воздушного потока снова каждый сценарий меняет имя переменной имени файла, а дата обработки отличается от шага 1!

Пример: Шаг 1: test_20190624113656.csv Шаг 2: test_20190624113705.csv

В этом случае я хотел бы сохранить имя файла на первом этапе.

nm_arquivo = 'test_' + datetime.today().strftime('%Y%m%d%H%M%S') + '.csv'

def insert_bigquery(ds, **kwargs):
    bigquery_client = bigquery.Client(project="project_name")
    dataset_ref = bigquery_client.dataset('test_dataset')
    job_config = bigquery.LoadJobConfig()
    job_config.schema = [
        bigquery.SchemaField('id','INTEGER',mode='REQUIRED'),
        bigquery.SchemaField('sigla','STRING',mode='REQUIRED'),
        bigquery.SchemaField('nome_en','STRING',mode='REQUIRED'),
        bigquery.SchemaField('nome_pt','STRING',mode='REQUIRED'),
    ]   
    job_config.source_format = bigquery.SourceFormat.CSV
    time_partitioning = bigquery.table.TimePartitioning()
    job_config.time_partitioning = time_partitioning
    job_config.clustering_fields = ["id", "sigla"]
    uri = "gs://bucket_name/"+nm_arquivo
    load_job = bigquery_client.load_table_from_uri(
        uri,
        dataset_ref.table('bdb'),
        job_config=job_config
        )
    print('Starting job {}'.format(load_job.job_id))
    load_job.result()
    print('Job finished.')

#step1      
import_orders_op = MsSqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mssql_conn_id='mssql_conn',
    google_cloud_storage_conn_id='gcp_conn',
    sql="""select * from bdb""",
    bucket='bucket_name',
    filename=nm_arquivo,
    dag=dag) 

#step2
run_this = PythonOperator(
    task_id='insert_bigquery',
    provide_context=True,
    python_callable=insert_bigquery,
    dag=dag,
)

run_this.set_upstream(import_orders_op)

Ответы [ 2 ]

2 голосов
/ 25 июня 2019

Вы должны использовать время выполнения DAG.

Вы можете использовать {{ ts_nodash }} Макросы воздушного потока.Он форматирует execution_date.isoformat() (пример: 2018-01-01T00:00:00+00:00) для удаления - & :, пример: 20180101T000000.Этот макрос можно использовать в любом шаблонном параметре.

Для получения дополнительной информации и списка всех других доступных переменных:

0 голосов
/ 24 июня 2019

Вы можете использовать файл для хранения имени файла:

import pickle

nm_arquivo = 'test_' + datetime.today().strftime('%Y%m%d%H%M%S') + '.csv'

#step 1
with open('filename.pickle', 'wb') as handle:
    pickle.dump(nm_arquivo, handle)

#step 2
with open('filename.pickle', 'rb') as handle:
    nm_arquivo = pickle.load(handle)

...