Я пытаюсь создать aws лямбду в python для запуска кластера EMR. Ранее я запускал EMR, используя скрипт bash и cron Tab. Поскольку моя работа выполняется только ежедневно, поэтому попытка перейти к лямбде, как вызов кластера, занимает совсем мало времени.
Я написал ниже скрипт для запуска EMR. Но получить исключение поддержки пряжи. Что я тут не так делаю?
Исключение
org.apache.spark.SparkException: Unable to load YARN support
at org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:405)
at org.apache.spark.deploy.SparkHadoopUtil$.yarn$lzycompute(SparkHadoopUtil.scala:400)
at org.apache.spark.deploy.SparkHadoopUtil$.yarn(SparkHadoopUtil.scala:400)
at org.apache.spark.deploy.SparkHadoopUtil$.get(SparkHadoopUtil.scala:425)
at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:2387)
at org.apache.spark.storage.BlockManager.<init>(BlockManager.scala:156)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:351)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:175)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:257)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:432)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2509)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:909)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:901)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:901)
at com.mmt.sp.hotelsttlengine.util.SparkConnectionFactory.createNewOrExistingSession(SparkConnectionFactory.java:32)
at com.mmt.sp.hotelsttlengine.starter.GenerateUnifiedErrRefreshReport.run(GenerateUnifiedErrRefreshReport.java:67)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:813)
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:797)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:324)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
at com.mmt.sp.hotelsttlengine.starter.GenerateUnifiedErrRefreshReport.main(GenerateUnifiedErrRefreshReport.java:47)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:239)
at org.apache.hadoop.util.RunJar.main(RunJar.java:153)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:230)
at org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:401)
Работа Bash Скрипт
#!/usr/bin/env bash
. ./config.sh
# Termination protection on
TERMINATION_PROTECTED=--no-termination-protected
# comment to turn auto terminate off. Putting by default true.
AUTO_TERMINATE=--auto-terminate
#Access key name. Please Change this as per the machine.
KEY_NAME=keyname
# S3 URI for logs
LOG_URI=s3://bucketpath/clusterlogs/
#source files path
S3_BUCKET_URL=s3://bucketpath/
# create cluster
json=$(aws emr create-cluster $TERMINATION_PROTECTED $AUTO_TERMINATE \
--applications Name=Hadoop Name=Spark \
--configurations file://spark-config.json \
--release-label emr-5.8.0 \
--ec2-attributes "{\"KeyName\":\"$KEY_NAME\",
\"SubnetId\":\"$SUBNET_ID\",
\"InstanceProfile\":\"EMR_EC2_DefaultRole\",
\"ServiceAccessSecurityGroup\":\"$SERVICE_ACCESS_SECURITY_GROUP\",
\"EmrManagedMasterSecurityGroup\":\"$EMR_MANAGED_MASTER_SECURITY_GROUP\",
\"EmrManagedSlaveSecurityGroup\":\"$EMR_MANAGED_SLAVE_SECURITY_GROUP\"}"\
--service-role EMR_DefaultRole \
--enable-debugging \
--release-label emr-5.8.0 \
--log-uri $LOG_URI \
--instance-groups '[{"InstanceCount":1,
"InstanceGroupType":"MASTER",
"InstanceType":"m4.xlarge",
"Name":"Master instance group - 1"},
{"InstanceCount":6,
"InstanceGroupType":"CORE",
"InstanceType":"m4.2xlarge",
"Name":"Core instance group - 1"}]' \
--name 'myjobname '\
--region my-region\
--steps "[
{\"Args\":[
{\"Args\":[
\"--class\",\"com.test.MainClass\",
\"s3://bucketpath/jars/application/myapplication.jar\",
\"prod\",\"$arg1\",\"$arg2\\",\"$arg3\",\"$arg4\"],
\"Type\":\"SPARK\",
\"ActionOnFailure\":\"$STEP_FAILURE_ACTION\",
\"Properties\":\"\",
\"Name\":\"mystep\"}
]"\
)
# check error; if none get cluster_id
if [ $? -ne 0 ]; then
echo Failed to create cluster 1>&2
exit $STATUS
fi
cluster_id=`echo $json | grep '"ClusterId":' | cut -d'"' -f4`
# wait for it to run
SECONDS=0
echo -n Waiting for cluster $cluster_id to run ...
aws emr wait cluster-running --cluster-id $cluster_id
duration=$SECONDS
echo CLUSTER-ID: $cluster_id
echo STATUS:UP
echo TIME TAKEN: $duration SECONDS
Python Скрипт
import boto3
LOG_URI = "s3://log path/"
S3_BUCKET_URL = "s3://bucket path/"
def launchCluster(**kwargs):
connection = boto3.client('emr', region_name='region_name')
cluster_id = connection.run_job_flow(Name='myjob name', LogUri=LOG_URI,
ReleaseLabel='emr-5.28.0',
ServiceRole= 'EMR_DefaultRole',
Applications=[
{
'Name': 'Hadoop'
},
{
'Name': 'Spark'
}
],
Configurations=[{
"Classification": "spark-defaults",
"Properties": {
"spark.eventLog.dir": "s3a://evnlogpath/spark-history/",
"spark.history.fs.logDirectory": "s3a://eventlogpath/spark-history/",
"spark.eventLog.enabled": "true",
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": "2",
"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs": "false",
"spark.hadoop.parquet.enable.summary-metadata": "false",
"spark.executor.memory": "15g",
"maximizeResourceAllocation": "true",
"spark.driver.memory": "14g",
"spark.driver.maxResultSize": "10g"
}
}, {
"Classification": "spark-log4j",
"Properties": {
"spark.log4j.rootCategory": "WARN",
"spark.log4j.appender.file.layout": "org.apache.log4j.EnhancedPatternLayout",
"spark.log4j.appender.file.layout.ConversionPattern": "%d{yy/MM/dd HH:mm:ss.SSS}{IST} %t %p %c{1}: %m%n"
}
}, {
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "true"
}
}
],
Instances={
'InstanceGroups': [
{"InstanceCount": 1,
"InstanceRole": "MASTER",
"InstanceType": "m4.xlarge",
"Name": "Master instance group - 1"},
{"InstanceCount": 6,
"InstanceRole": "CORE",
"InstanceType": "m4.2xlarge",
"Name": "Core instance group - 1"}],
"EmrManagedMasterSecurityGroup": "...",
"EmrManagedSlaveSecurityGroup": "...",
"ServiceAccessSecurityGroup": "...",
"Ec2SubnetId": "...",
# "InstanceRole":"MASTER",
"Ec2KeyName": "...",
"KeepJobFlowAliveWhenNoSteps": False,
"TerminationProtected": False,
},
Steps=[
{
"Name": "xz",
"ActionOnFailure": "TERMINATE_CLUSTER",
'HadoopJarStep': {
"Jar": "s3://bucket/jars/application/some-app.jar",
# "Properties": "",
"MainClass": "com.test.MainClass",
"Args": [...],
}
}
],
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
Tags=[
{
'Key': '..',
'Value': '..',
}...
],
)
print("cluster is created:", str(cluster_id))
return cluster_id
launchCluster()
Искровая пряжа
sparks = SparkSession.builder().master("yarn").appName(jobName)
.config("spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
.config("spark.hadoop.parquet.enable.summary-metadata", "false")
.config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
.getOrCreate();