Я начинаю использовать Airflow, и мой первый рабочий процесс включает перемещение файла из GCP в S3 (туда и обратно).
Задача, выполняющая работу (и весь DAG), успешно завершается, но для этого требуется 7минут, которые можно увидеть в журналах ниже для передачи файлов (и я предполагаю, что некоторые вещи аутентификации и протокола).
[2018-09-19 13: 58: 34,498] {logging_mixin.py:95} INFO - [2018-09-19 13: 58: 34,496] {credentials.py:1032} INFO - Найденные учетные данные в файле общих учетных данных: ~ / .aws / credentials
[2018-09-1914: 05: 55,920] {logging_mixin.py:95} INFO - [2018-09-19 14: 05: 55,920] {gcp_api_base_hook.py:84} INFO - получение соединения с использованием google.auth.default()
, так как для ловушки не определен файл ключей.
В той же группе обеспечения доступности баз данных есть задача для выполнения дополнительной задачи - передача файла из S3 в GCP, и это быстро (менее 1 минуты).
from __future__ import print_function
from builtins import range
from datetime import datetime
import airflow
from airflow.operators import OmegaFileSensor
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.contrib.operators.gcs_to_s3 import GoogleCloudStorageToS3Operator
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStorageObjectSensor
from airflow.models import DAG
import time
from pprint import pprint
S3_BUCKET = 'data-preprod-redshift-exports'
# S3_OBJECT = 'airflow/seattlecheckoutsbytitle.zip' # 2GB
S3_OBJECT = 'airflow/cnpjqsa.zip' # 400Mb
# S3_OBJECT = '/airflow/chicagobusinesslicensesandowners.zip' # 100 Mb
GCS_BUCKET = 'ds_de_airflow'
args = {
'owner': 'airflow',
'start_date': datetime(2018,9,18)#,
#'execution_timeout':None,
#'dagrun_timeout': None
}
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
with DAG( dag_id='a_second', default_args=args, schedule_interval=None) as dag:
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=print_context
)
s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
task_id = 's3_to_gcs_op',
bucket = S3_BUCKET,
prefix = S3_OBJECT,
dest_gcs_conn_id = 'google_cloud_default',
dest_gcs = 'gs://ds_de_airflow/Task1_upload/',
replace = False
)
# for some reason this takes no less than 7 minutes (tried 3 times)
gcs_to_s3_op = GoogleCloudStorageToS3Operator(
task_id = 'gcs_to_s3_op',
bucket = GCS_BUCKET,
prefix = 'Task1_upload',
delimiter = 'fileGCS.txt',
google_cloud_storage_conn_id ='google_cloud_default',
dest_aws_conn_id = 'aws_default',
dest_s3_key = 's3://data-preprod-redshift-exports/airflow/',
replace = False
)
gcs_sensor = GoogleCloudStorageObjectSensor(
task_id = 'gcs_sensor',
bucket = GCS_BUCKET,
object = 'Task1_upload/airflow/fileS3.txt' # this is not the most interesting file to sensor for but for now...
)
run_this >> s3_to_gcs_op >> gcs_sensor >> gcs_to_s3_op
Мы установили поток воздуха в Google Cloud Shell с механизмом БД по умолчанию (1 поток).
Вопрос, который я задаюs: как сократить время выполнения этой 7-минутной задачи до чего-то более разумного?