Airflow / Luigi для автоматического создания кластера AWS EMR и развертывания pyspark - PullRequest
0 голосов
/ 16 апреля 2019

Я новичок в области автоматизации воздушного потока, теперь я не знаю, возможно ли это сделать с помощью Apache Airflow (или Luigi и т. Д.), Или я должен просто сделать длинный bash-файл для этого.

Я хочуbuild dag для этого

  1. Создание / клонирование кластера в AWS EMR
  2. Установка требований Python
  3. Установка связанных библиотек pyspark
  4. Получить последний код отgithub
  5. Отправить искровое задание
  6. Завершить кластер при завершении

для отдельных шагов, я могу создать .sh файлы, как показано ниже (не уверен, что это хорошо делатьэто или нет) но не знаю, как это сделать в потоке воздуха

1) создать cluser с cluster.sh

 aws emr create-cluster \
    --name "1-node dummy cluster" \
    --instance-type m3.xlarge \
    --release-label emr-4.1.0 \
    --instance-count 1 \
    --use-default-roles \
    --applications Name=Spark \
    --auto-terminate

2 & 3 & 4) клонировать git и установить требования codesetup.sh

git clone some-repo.git
pip install -r requirements.txt
mv xyz.jar /usr/lib/spark/xyz.jar

5) Задание запуска зажигания sparkjob.sh

aws emr add-steps --cluster-id <Your EMR cluster id> --steps Type=spark,Name=TestJob,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,pythonjob.py,s3a://your-source-bucket/data/data.csv,s3a://your-destination-bucket/test-output/],ActionOnFailure=CONTINUE

6) Не уверен, может ли это быть

  terminate-clusters
--cluster-ids <value> [<value>...]

Наконец все это может быть выполненокак один файл .sh.Мне нужно знать хороший подход к этому с помощью airflow / luigi.

Что я нашел:

Я считаю, что это сообщение близко, но оно устарело (2016) и пропускает соединения и код дляпьесы

https://www.agari.com/email-security-blog/automated-model-building-emr-spark-airflow/

Ответы [ 2 ]

0 голосов
/ 31 мая 2019

Я понял, что может быть два варианта сделать это

1) мы можем сделать скрипт bash с помощью emr create-cluster и addstep, а затемиспользуйте airflow Bashoperator, чтобы запланировать его

В качестве альтернативы, есть обертка вокруг этих двух, называемая sparksteps

Пример из их документации

sparksteps examples/episodes.py \
  --s3-bucket $AWS_S3_BUCKET \
  --aws-region us-east-1 \
  --release-label emr-4.7.0 \
  --uploads examples/lib examples/episodes.avro \
  --submit-args="--deploy-mode client --jars /home/hadoop/lib/spark-avro_2.10-2.0.2-custom.jar" \
  --app-args="--input /home/hadoop/episodes.avro" \
  --tags Application="Spark Steps" \
  --debug

вы можете сделать .sh script с опцией по умолчанию на ваш выбор.После подготовки этого сценария вы можете вызвать его из Bashoperator воздушного потока, как показано ниже:

create_command = "sparkstep_custom.sh "    

t1 = BashOperator(
        task_id= 'create_file',
        bash_command=create_command,
        dag=dag
   )

2) Вы можете использовать собственные операторы воздушного потока для aws, чтобы сделать это.

EmrCreateJobFlowOperator (для запуска кластера) EmrAddStepsOperator (для отправки искрового задания) EmrStepSensor (для отслеживания завершения шага) EmrTerminateJobFlowOperator (для завершения кластера после завершения шага)

Базовый пример создания кластера и отправки шага

my_step=[

    {
        'Name': 'setup - copy files',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['aws', 's3', 'cp', S3_URI + 'test.py', '/home/hadoop/']
        }
    },
{
        'Name': 'setup - copy files 3',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['aws', 's3', 'cp', S3_URI + 'myfiledependecy.py', '/home/hadoop/']
        }
    },
 {
        'Name': 'Run Spark',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit','--jars', "jar1.jar,jar2.jar", '--py-files','/home/hadoop/myfiledependecy.py','/home/hadoop/test.py']
        }
    }
    ]


cluster_creator = EmrCreateJobFlowOperator(
    task_id='create_job_flow2',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
    dag=dag
)

step_adder_pre_step = EmrAddStepsOperator(
    task_id='pre_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=my_steps,
    dag=dag
)
step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('pre_step', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    aws_conn_id='aws_default',
    dag=dag
)

Кроме того, чтобы загрузить код в s3 (где мне было интересно получить последний код из github_, это можно сделать с помощью s3, boto3 и Pythonoperator

Простой пример

S3_BUCKET = 'you_bucket_name'
S3_URI = 's3://{bucket}/'.format(bucket=S3_BUCKET)
def upload_file_to_S3(filename, key, bucket_name):
    s3.Bucket(bucket_name).upload_file(filename, key)

upload_to_S3_task = PythonOperator(
    task_id='upload_to_S3',
    python_callable=upload_file_to_S3,
    op_kwargs={
        'filename': configdata['project_path']+'test.py',
        'key': 'test.py',
        'bucket_name': 'dep-buck',
    },
    dag=dag)
0 голосов
/ 16 апреля 2019

В Airflow есть операторы для этого. поток воздуха документ

...