Задача воздушного потока занимает 7 минут - PullRequest
0 голосов
/ 19 сентября 2018

Я начинаю использовать Airflow, и мой первый рабочий процесс включает перемещение файла из GCP в S3 (туда и обратно).

Задача, выполняющая работу (и весь DAG), успешно завершается, но для этого требуется 7минут, которые можно увидеть в журналах ниже для передачи файлов (и я предполагаю, что некоторые вещи аутентификации и протокола).

[2018-09-19 13: 58: 34,498] {logging_mixin.py:95} INFO - [2018-09-19 13: 58: 34,496] {credentials.py:1032} INFO - Найденные учетные данные в файле общих учетных данных: ~ / .aws / credentials

[2018-09-1914: 05: 55,920] {logging_mixin.py:95} INFO - [2018-09-19 14: 05: 55,920] {gcp_api_base_hook.py:84} INFO - получение соединения с использованием google.auth.default(), так как для ловушки не определен файл ключей.

В той же группе обеспечения доступности баз данных есть задача для выполнения дополнительной задачи - передача файла из S3 в GCP, и это быстро (менее 1 минуты).

from __future__ import print_function

from builtins import range
from datetime import datetime
import airflow
from airflow.operators import OmegaFileSensor
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.contrib.operators.gcs_to_s3 import GoogleCloudStorageToS3Operator
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStorageObjectSensor
from airflow.models import DAG

import time
from pprint import pprint

S3_BUCKET = 'data-preprod-redshift-exports'
# S3_OBJECT = 'airflow/seattlecheckoutsbytitle.zip' # 2GB
S3_OBJECT = 'airflow/cnpjqsa.zip' # 400Mb   
# S3_OBJECT = '/airflow/chicagobusinesslicensesandowners.zip' # 100 Mb  

GCS_BUCKET = 'ds_de_airflow' 

args = {
    'owner': 'airflow',
    'start_date': datetime(2018,9,18)#,
    #'execution_timeout':None,
    #'dagrun_timeout': None
}

def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

with DAG( dag_id='a_second', default_args=args, schedule_interval=None) as dag:

    run_this = PythonOperator(
        task_id='run_this',
        provide_context=True,
        python_callable=print_context
    )

    s3_to_gcs_op = S3ToGoogleCloudStorageOperator( 
        task_id = 's3_to_gcs_op', 
        bucket = S3_BUCKET,
        prefix = S3_OBJECT, 
        dest_gcs_conn_id = 'google_cloud_default',
        dest_gcs = 'gs://ds_de_airflow/Task1_upload/', 
        replace = False
    )

    # for some reason this takes no less than 7 minutes (tried 3 times) 
    gcs_to_s3_op = GoogleCloudStorageToS3Operator(
        task_id = 'gcs_to_s3_op', 
        bucket = GCS_BUCKET,
        prefix = 'Task1_upload',
        delimiter = 'fileGCS.txt',
        google_cloud_storage_conn_id ='google_cloud_default',
        dest_aws_conn_id = 'aws_default',
        dest_s3_key = 's3://data-preprod-redshift-exports/airflow/',
        replace = False
    )

    gcs_sensor = GoogleCloudStorageObjectSensor(
        task_id = 'gcs_sensor',
        bucket = GCS_BUCKET,
        object = 'Task1_upload/airflow/fileS3.txt'  # this is not the most interesting file to sensor for but for now...
    )

    run_this >> s3_to_gcs_op >> gcs_sensor >> gcs_to_s3_op

Мы установили поток воздуха в Google Cloud Shell с механизмом БД по умолчанию (1 поток).

Вопрос, который я задаюs: как сократить время выполнения этой 7-минутной задачи до чего-то более разумного?

...