Emr Launch с AWS лямбда - PullRequest
       33

Emr Launch с AWS лямбда

0 голосов
/ 06 февраля 2020

Я пытаюсь создать aws лямбду в python для запуска кластера EMR. Ранее я запускал EMR, используя скрипт bash и cron Tab. Поскольку моя работа выполняется только ежедневно, поэтому попытка перейти к лямбде, как вызов кластера, занимает совсем мало времени.

Я написал ниже скрипт для запуска EMR. Но получить исключение поддержки пряжи. Что я тут не так делаю?

Исключение

org.apache.spark.SparkException: Unable to load YARN support
    at org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:405)
    at org.apache.spark.deploy.SparkHadoopUtil$.yarn$lzycompute(SparkHadoopUtil.scala:400)
    at org.apache.spark.deploy.SparkHadoopUtil$.yarn(SparkHadoopUtil.scala:400)
    at org.apache.spark.deploy.SparkHadoopUtil$.get(SparkHadoopUtil.scala:425)
    at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:2387)
    at org.apache.spark.storage.BlockManager.<init>(BlockManager.scala:156)
    at org.apache.spark.SparkEnv$.create(SparkEnv.scala:351)
    at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:175)
    at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:257)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:432)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2509)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:909)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:901)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:901)
    at com.mmt.sp.hotelsttlengine.util.SparkConnectionFactory.createNewOrExistingSession(SparkConnectionFactory.java:32)
    at com.mmt.sp.hotelsttlengine.starter.GenerateUnifiedErrRefreshReport.run(GenerateUnifiedErrRefreshReport.java:67)
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:813)
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:797)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:324)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
    at com.mmt.sp.hotelsttlengine.starter.GenerateUnifiedErrRefreshReport.main(GenerateUnifiedErrRefreshReport.java:47)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:239)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:153)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.spark.util.Utils$.classForName(Utils.scala:230)
    at org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:401)

Работа Bash Скрипт

#!/usr/bin/env bash
. ./config.sh
# Termination protection on
TERMINATION_PROTECTED=--no-termination-protected

# comment to turn auto terminate off. Putting by default true.
AUTO_TERMINATE=--auto-terminate



#Access key name. Please Change this as per the machine.
KEY_NAME=keyname

# S3 URI for logs
LOG_URI=s3://bucketpath/clusterlogs/

#source files path
S3_BUCKET_URL=s3://bucketpath/


# create cluster
json=$(aws emr create-cluster $TERMINATION_PROTECTED $AUTO_TERMINATE \
  --applications Name=Hadoop Name=Spark \
  --configurations file://spark-config.json \
  --release-label emr-5.8.0 \
  --ec2-attributes "{\"KeyName\":\"$KEY_NAME\",
                     \"SubnetId\":\"$SUBNET_ID\",
                     \"InstanceProfile\":\"EMR_EC2_DefaultRole\",
                     \"ServiceAccessSecurityGroup\":\"$SERVICE_ACCESS_SECURITY_GROUP\",
                     \"EmrManagedMasterSecurityGroup\":\"$EMR_MANAGED_MASTER_SECURITY_GROUP\",
                     \"EmrManagedSlaveSecurityGroup\":\"$EMR_MANAGED_SLAVE_SECURITY_GROUP\"}"\
  --service-role EMR_DefaultRole \
  --enable-debugging \
  --release-label emr-5.8.0 \
  --log-uri $LOG_URI \
  --instance-groups '[{"InstanceCount":1,
                       "InstanceGroupType":"MASTER",
                       "InstanceType":"m4.xlarge",
                       "Name":"Master instance group - 1"},
                      {"InstanceCount":6,
                       "InstanceGroupType":"CORE",
                       "InstanceType":"m4.2xlarge",
                       "Name":"Core instance group - 1"}]' \
 --name 'myjobname '\
  --region my-region\
  --steps "[
{\"Args\":[

{\"Args\":[
                \"--class\",\"com.test.MainClass\",
                     \"s3://bucketpath/jars/application/myapplication.jar\",
             \"prod\",\"$arg1\",\"$arg2\\",\"$arg3\",\"$arg4\"],
             \"Type\":\"SPARK\",
             \"ActionOnFailure\":\"$STEP_FAILURE_ACTION\",
             \"Properties\":\"\",
             \"Name\":\"mystep\"}
]"\
)


# check error; if none get cluster_id
if [ $? -ne 0 ]; then
  echo Failed to create cluster 1>&2
  exit $STATUS
fi
cluster_id=`echo $json | grep '"ClusterId":' | cut -d'"' -f4`


# wait for it to run
SECONDS=0
echo -n Waiting for cluster $cluster_id to run ...
aws emr wait cluster-running --cluster-id $cluster_id
duration=$SECONDS
echo CLUSTER-ID: $cluster_id
echo STATUS:UP
echo TIME TAKEN: $duration SECONDS

Python Скрипт


import boto3

LOG_URI = "s3://log path/"


S3_BUCKET_URL = "s3://bucket path/"


def launchCluster(**kwargs):
    connection = boto3.client('emr', region_name='region_name')

    cluster_id = connection.run_job_flow(Name='myjob name', LogUri=LOG_URI,
                                         ReleaseLabel='emr-5.28.0',
                                         ServiceRole= 'EMR_DefaultRole',
                                         Applications=[
                                             {
                                                 'Name': 'Hadoop'
                                             },
                                             {
                                                 'Name': 'Spark'
                                             }

                                         ],

                                         Configurations=[{
                                             "Classification": "spark-defaults",
                                             "Properties": {
                                                 "spark.eventLog.dir": "s3a://evnlogpath/spark-history/",
                                                 "spark.history.fs.logDirectory": "s3a://eventlogpath/spark-history/",
                                                 "spark.eventLog.enabled": "true",
                                                 "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
                                                 "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": "2",
                                                 "spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs": "false",
                                                 "spark.hadoop.parquet.enable.summary-metadata": "false",
                                                 "spark.executor.memory": "15g",
                                                 "maximizeResourceAllocation": "true",
                                                 "spark.driver.memory": "14g",
                                                 "spark.driver.maxResultSize": "10g"

                                             }
                                         }, {
                                             "Classification": "spark-log4j",
                                             "Properties": {
                                                 "spark.log4j.rootCategory": "WARN",
                                                 "spark.log4j.appender.file.layout": "org.apache.log4j.EnhancedPatternLayout",
                                                 "spark.log4j.appender.file.layout.ConversionPattern": "%d{yy/MM/dd HH:mm:ss.SSS}{IST} %t %p %c{1}: %m%n"
                                             }
                                         }, {
                                             "Classification": "spark",
                                             "Properties": {
                                                 "maximizeResourceAllocation": "true"
                                             }
                                         }
                                         ],
                                         Instances={
                                             'InstanceGroups': [
                                                 {"InstanceCount": 1,
                                                  "InstanceRole": "MASTER",
                                                  "InstanceType": "m4.xlarge",
                                                  "Name": "Master instance group - 1"},
                                                 {"InstanceCount": 6,
                                                  "InstanceRole": "CORE",
                                                  "InstanceType": "m4.2xlarge",
                                                  "Name": "Core instance group - 1"}],

                                             "EmrManagedMasterSecurityGroup": "...",
                                             "EmrManagedSlaveSecurityGroup": "...",
                                             "ServiceAccessSecurityGroup": "...",
                                             "Ec2SubnetId": "...",
                                             # "InstanceRole":"MASTER",

                                             "Ec2KeyName": "...",
                                             "KeepJobFlowAliveWhenNoSteps": False,
                                             "TerminationProtected": False,
                                         },
                                         Steps=[

                                             {
                                                 "Name": "xz",
                                                 "ActionOnFailure": "TERMINATE_CLUSTER",
                                                 'HadoopJarStep': {
                                                     "Jar": "s3://bucket/jars/application/some-app.jar",
                                                     # "Properties": "",
                                                     "MainClass": "com.test.MainClass",
                                                     "Args": [...],
                                                                                                  }
                                             }

                                         ],
                                         VisibleToAllUsers=True,
                                         JobFlowRole='EMR_EC2_DefaultRole',
                                         Tags=[
                                             {
                                                 'Key': '..',
                                                 'Value': '..',
                                             }...
                                         ],
                                         )
    print("cluster is created:", str(cluster_id))
    return cluster_id


launchCluster()

Искровая пряжа

sparks =  SparkSession.builder().master("yarn").appName(jobName)
                    .config("spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
                    .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
                    .config("spark.hadoop.parquet.enable.summary-metadata", "false")
                    .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
                    .getOrCreate();

Ответы [ 2 ]

0 голосов
/ 13 февраля 2020

Вот часть шага, которая используется в моем приложении с python boto3 для запуска EMR. Вам не нужно устанавливать мастер как пряжу в источнике искры.

...
        Steps=[{
            'Name': 'Main',
            'ActionOnFailure': 'TERMINATE_CLUSTER',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': ['spark-submit',
                         '--master', 'yarn', '--deploy-mode', 'client',
                         '--class', '<class name>',
                         's3://bucket/path/<jar name>.jar', '<argument>'
                         ]
            }
        }],
...
0 голосов
/ 12 февраля 2020

Насколько я помню, oop jar step будет использовать команду runner для отправки толстого фляги для отправки приложения в пряжу. Поэтому я бы сказал, удалить .master ("пряжа") и попытаться запустить. если произошла ошибка, добавьте ее сюда в сообщении.

...