Как выполнить S3 в BigQuery с помощью Airflow? - PullRequest
0 голосов
/ 03 сентября 2018

В настоящее время нет оператора S3ToBigQuery .

Мой выбор:

  1. Используйте S3ToGoogleCloudStorageOperator , а затем используйте GoogleCloudStorageToBigQueryOperator

    Это не то, что я стремлюсь сделать. Это означает двойную оплату за хранение. Даже при удалении файла из одного из хранилищ, которое по-прежнему связано с оплатой.

  2. Загрузите файл из S3 в локальную файловую систему и загрузите его в BigQuery из файловой системы - однако S3DownloadOperator не существует. Это означает, что нужно писать весь процесс с нуля без участия Airflow. Это упускает смысл использования воздушного потока.

Есть ли другой вариант? Что бы вы предложили сделать?

Ответы [ 3 ]

0 голосов
/ 04 сентября 2018

Вместо этого вы можете использовать S3ToGoogleCloudStorageOperator, а затем GoogleCloudStorageToBigQueryOperator с табличным флагом external_table, т.е. передать external_table =True.

Это создаст внешние данные, которые указывают на местоположение GCS и не сохранят ваши данные в BigQuery, но вы все равно можете запросить их.

0 голосов
/ 24 сентября 2018

Это то, чем я закончил. Это должно быть преобразовано в оператор S3toLocalFile.

def download_from_s3(**kwargs):
    hook = S3Hook(aws_conn_id='project-s3')    

    result = hook.read_key(bucket_name='stage-project-metrics',
                           key='{}.csv'.format(kwargs['ds']))

    if not result:
        logging.info('no data found')
    else:
        outfile = '{}project{}.csv'.format(Variable.get("data_directory"),kwargs['ds'])

        f=open(outfile,'w+')
        f.write(result)
        f.close()

    return result
0 голосов
/ 03 сентября 2018

Если первый вариант ограничивает стоимость, вы можете просто использовать S3Hook для загрузки файла через PythonOperator:

from airflow.hooks.S3_hook import S3Hook
from datetime import timedelta, datetime
from airflow import DAG
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
}
def download_from_s3(**kwargs):


    hook = S3Hook(aws_conn_id='s3_conn')

    hook.read_key(bucket_name='workflows-dev',
                   key='test_data.csv')

dag = DAG('s3_download',
          schedule_interval='@daily',
          default_args=default_args,
          catchup=False)

with dag:
download_data = PythonOperator(
        task_id='download_data',
        python_callable=download_from_s3,
        provide_context=True
    )
...