Итак, я просмотрел еще несколько сайтов и нашел решение для своей проблемы ...
Проблема, с которой я столкнулся, была StepConcurrencyLevel, где я могу добавить ее, используя AWS Консоль (или) - AWS CLI (или) даже через Python с использованием BOTO3 ... Но я ожидал решения с использованием State Machine Language и нашел одно ...
Все, что нам нужно сделать, - это создать наш кластер с использованием Язык конечного автомата Мы должны указать StepConcurrencyLevel в нем как 2 (или) 3, где значение по умолчанию равно 1. После того, как он установлен, создайте 4 шага под этим кластером и запустите конечный автомат.
Где кластер распознает установленный номер параллелизма и запустит шаги соответственно.
Мой пример процесса:
-> JSON Сценарий моей оркестровки
{
"StartAt": "Create_A_Cluster",
"States": {
"Create_A_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
"Parameters": {
"Name": "WorkflowCluster",
"StepConcurrencyLevel": 2,
"Tags": [
{
"Key": "Description",
"Value": "process"
},
{
"Key": "Name",
"Value": "filename"
},
{
"Key": "Owner",
"Value": "owner"
},
{
"Key": "Project",
"Value": "roject"
},
{
"Key": "User",
"Value": "user"
}
],
"VisibleToAllUsers": true,
"ReleaseLabel": "emr-5.28.1",
"Applications": [
{
"Name": "Spark"
}
],
"ServiceRole": "EMR_DefaultRole",
"JobFlowRole": "EMR_EC2_DefaultRole",
"LogUri": "s3://prefix/prefix/log.txt/",
"Instances": {
"KeepJobFlowAliveWhenNoSteps": true,
"InstanceFleets": [
{
"InstanceFleetType": "MASTER",
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 90
}
]
},
{
"InstanceFleetType": "CORE",
"TargetSpotCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m4.xlarge",
"BidPriceAsPercentageOfOnDemandPrice": 90
}
]
}
]
}
},
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 5,
"MaxAttempts": 1,
"BackoffRate": 2.5
}
],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Fail_Cluster"
}
],
"ResultPath": "$.cluster",
"OutputPath": "$.cluster",
"Next": "Add_Steps_Parallel"
},
"Fail_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-west-2:919490798061:rsac_error_notification",
"Message.$": "$.Cause"
},
"Next": "Terminate_Cluster"
},
"Add_Steps_Parallel": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "Step_One",
"States": {
"Step_One": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId",
"Step": {
"Name": "The first step",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"cluster",
"--master",
"yarn",
"--conf",
"spark.dynamicAllocation.enabled=true",
"--conf",
"maximizeResourceAllocation=true",
"--conf",
"spark.shuffle.service.enabled=true",
"--py-files",
"s3://prefix/prefix/pythonfile.py",
"s3://prefix/prefix/pythonfile.py"
]
}
}
},
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 5,
"MaxAttempts": 1,
"BackoffRate": 2.5
}
],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"ResultPath": "$.err_mgs",
"Next": "Fail_SNS"
}
],
"ResultPath": "$.step1",
"Next": "Terminate_Cluster_1"
},
"Fail_SNS": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-west-2:919490798061:rsac_error_notification",
"Message.$": "$.err_mgs.Cause"
},
"ResultPath": "$.fail_cluster",
"Next": "Terminate_Cluster_1"
},
"Terminate_Cluster_1": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId"
},
"End": true
}
}
},
{
"StartAt": "Step_Two",
"States": {
"Step_Two": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:addStep",
"Parameters": {
"ClusterId.$": "$.ClusterId",
"Step": {
"Name": "The second step",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"cluster",
"--master",
"yarn",
"--conf",
"spark.dynamicAllocation.enabled=true",
"--conf",
"maximizeResourceAllocation=true",
"--conf",
"spark.shuffle.service.enabled=true",
"--py-files",
"s3://prefix/prefix/pythonfile.py",
"s3://prefix/prefix/pythonfile.py"
]
}
}
},
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 5,
"MaxAttempts": 1,
"BackoffRate": 2.5
}
],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"ResultPath": "$.err_mgs_1",
"Next": "Fail_SNS_1"
}
],
"ResultPath": "$.step2",
"Next": "Terminate_Cluster_2"
},
"Fail_SNS_1": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-west-2:919490798061:rsac_error_notification",
"Message.$": "$.err_mgs_1.Cause"
},
"ResultPath": "$.fail_cluster_1",
"Next": "Terminate_Cluster_2"
},
"Terminate_Cluster_2": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId"
},
"End": true
}
}
}
],
"ResultPath": "$.steps",
"Next": "Terminate_Cluster"
},
"Terminate_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
"Parameters": {
"ClusterId.$": "$.ClusterId"
},
"End": true
}
}
}
В этом сценарии (или) AWS Язык конечного автомата функции шага, при создании кластера я упомянул StepConcurrencyLevel как 2 и добавил 2 задания спарк как шаги ниже t кластер.
Когда я запустил этот сценарий в пошаговой функции, я смог организовать кластер и выполнить шаги для одновременного выполнения 2 шагов в кластере без прямой настройки его в консоли AWS EMR (или) через AWS CLI (или) даже через BOTO3.
Я просто использовал язык конечного автомата, чтобы выполнить оркестровку одновременного выполнения двух шагов в одном кластере в AWS Step Function без помощи других служб, таких как lambda или livy API или BOTO3 et c. ..
Вот как выглядит блок-схема: AWS Шаг Функция Рабочий процесс для одновременного выполнения шага
Чтобы быть более точным в том, куда я вставил StepConcurrencyLevel в выше State Machine Язык здесь:
"Create_A_Cluster": {
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
"Parameters": {
"Name": "WorkflowCluster",
"StepConcurrencyLevel": 2,
"Tags": [
{
"Key": "Description",
"Value": "process"
},
Под Create_A_Cluster .
Спасибо.