Можно ли выполнить пошаговый параллелизм для AWS EMR через AWS STEP FUNCTION без лямбды? - PullRequest
0 голосов
/ 15 января 2020

Это мой сценарий, я пытаюсь создать 4 AWS кластеров EMR, где каждому кластеру будет назначено 2 задания, поэтому это будет похоже на 4 кластера с 8 заданиями, организованными с использованием функции шага.

Мой поток должен выглядеть следующим образом:

4 Кластеры будут запускаться одновременно, выполняя 8 заданий параллельно, при этом каждый кластер будет запускать 2 задания параллельно.

Недавно AWS запустил эту функцию для одновременного запуска 2 (или) нескольких заданий в одном кластере, используя StepConcurrencyLevel в EMR, чтобы сократить время работы кластера, которое может быть выполняется с помощью консоли EMR, AWS CLI (или) даже через AWS лямбда.

Но я хочу выполнить этот процесс запуска 2 (или) большего количества заданий параллельно в одном кластере, используя AWS Шаг Функция с языком конечного автомата, подобным формату, указанному здесь https://docs.aws.amazon.com/step-functions/latest/dg/connect-emr.html

Я пытался ссылаться на многие сайты для выполнения этого процесса, где я получаю решение для этого через консоль (или) через формат boto3 в AWS лямбда, но я не смог найти решение по выполнению этого через саму пошаговую функцию ...

Есть ли какое-то решение для этого!?

Заранее спасибо ..

1 Ответ

0 голосов
/ 29 января 2020

Итак, я просмотрел еще несколько сайтов и нашел решение для своей проблемы ...

Проблема, с которой я столкнулся, была StepConcurrencyLevel, где я могу добавить ее, используя AWS Консоль (или) - AWS CLI (или) даже через Python с использованием BOTO3 ... Но я ожидал решения с использованием State Machine Language и нашел одно ...

Все, что нам нужно сделать, - это создать наш кластер с использованием Язык конечного автомата Мы должны указать StepConcurrencyLevel в нем как 2 (или) 3, где значение по умолчанию равно 1. После того, как он установлен, создайте 4 шага под этим кластером и запустите конечный автомат.

Где кластер распознает установленный номер параллелизма и запустит шаги соответственно.

Мой пример процесса:

-> JSON Сценарий моей оркестровки

 {
  "StartAt": "Create_A_Cluster",
  "States": {
    "Create_A_Cluster": {
      "Type": "Task",
      "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
      "Parameters": {
        "Name": "WorkflowCluster",
        "StepConcurrencyLevel": 2,
        "Tags": [
          {
            "Key": "Description",
            "Value": "process"
          },
          {
            "Key": "Name",
            "Value": "filename"
          },
          {
            "Key": "Owner",
            "Value": "owner"
          },
          {
            "Key": "Project",
            "Value": "roject"
          },
          {
            "Key": "User",
            "Value": "user"
          }
        ],
        "VisibleToAllUsers": true,
        "ReleaseLabel": "emr-5.28.1",
        "Applications": [
          {
            "Name": "Spark"
          }
        ],
        "ServiceRole": "EMR_DefaultRole",
        "JobFlowRole": "EMR_EC2_DefaultRole",
        "LogUri": "s3://prefix/prefix/log.txt/",
        "Instances": {
          "KeepJobFlowAliveWhenNoSteps": true,
          "InstanceFleets": [
            {
              "InstanceFleetType": "MASTER",
              "TargetSpotCapacity": 1,
              "InstanceTypeConfigs": [
                {
                  "InstanceType": "m4.xlarge",
                  "BidPriceAsPercentageOfOnDemandPrice": 90
                }
              ]
            },
            {
              "InstanceFleetType": "CORE",
              "TargetSpotCapacity": 1,
              "InstanceTypeConfigs": [
                {
                  "InstanceType": "m4.xlarge",
                  "BidPriceAsPercentageOfOnDemandPrice": 90
                }
              ]
            }
          ]
        }
      },
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 5,
          "MaxAttempts": 1,
          "BackoffRate": 2.5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "Fail_Cluster"
        }
      ],
      "ResultPath": "$.cluster",
      "OutputPath": "$.cluster",
      "Next": "Add_Steps_Parallel"
    },
    "Fail_Cluster": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "arn:aws:sns:us-west-2:919490798061:rsac_error_notification",
        "Message.$": "$.Cause"
      },
      "Next": "Terminate_Cluster"
    },
    "Add_Steps_Parallel": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "Step_One",
          "States": {
            "Step_One": {
              "Type": "Task",
              "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
              "Parameters": {
                "ClusterId.$": "$.ClusterId",
                "Step": {
                  "Name": "The first step",
                  "ActionOnFailure": "TERMINATE_CLUSTER",
                  "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": [
                      "spark-submit",
                      "--deploy-mode",
                      "cluster",
                      "--master",
                      "yarn",
                      "--conf",
                      "spark.dynamicAllocation.enabled=true",
                      "--conf",
                      "maximizeResourceAllocation=true",
                      "--conf",
                      "spark.shuffle.service.enabled=true",
                      "--py-files",
                      "s3://prefix/prefix/pythonfile.py",
                      "s3://prefix/prefix/pythonfile.py"
                    ]
                  }
                }
              },
              "Retry": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "IntervalSeconds": 5,
                  "MaxAttempts": 1,
                  "BackoffRate": 2.5
                }
              ],
              "Catch": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "ResultPath": "$.err_mgs",
                  "Next": "Fail_SNS"
                }
              ],
              "ResultPath": "$.step1",
              "Next": "Terminate_Cluster_1"
            },
            "Fail_SNS": {
              "Type": "Task",
              "Resource": "arn:aws:states:::sns:publish",
              "Parameters": {
                "TopicArn": "arn:aws:sns:us-west-2:919490798061:rsac_error_notification",
                "Message.$": "$.err_mgs.Cause"
              },
              "ResultPath": "$.fail_cluster",
              "Next": "Terminate_Cluster_1"
            },
            "Terminate_Cluster_1": {
              "Type": "Task",
              "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
              "Parameters": {
                "ClusterId.$": "$.ClusterId"
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "Step_Two",
          "States": {
            "Step_Two": {
              "Type": "Task",
              "Resource": "arn:aws:states:::elasticmapreduce:addStep",
              "Parameters": {
                "ClusterId.$": "$.ClusterId",
                "Step": {
                  "Name": "The second step",
                  "ActionOnFailure": "TERMINATE_CLUSTER",
                  "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": [
                      "spark-submit",
                      "--deploy-mode",
                      "cluster",
                      "--master",
                      "yarn",
                      "--conf",
                      "spark.dynamicAllocation.enabled=true",
                      "--conf",
                      "maximizeResourceAllocation=true",
                      "--conf",
                      "spark.shuffle.service.enabled=true",
                      "--py-files",
                      "s3://prefix/prefix/pythonfile.py",
                      "s3://prefix/prefix/pythonfile.py"
                    ]
                  }
                }
              },
              "Retry": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "IntervalSeconds": 5,
                  "MaxAttempts": 1,
                  "BackoffRate": 2.5
                }
              ],
              "Catch": [
                {
                  "ErrorEquals": [
                    "States.ALL"
                  ],
                  "ResultPath": "$.err_mgs_1",
                  "Next": "Fail_SNS_1"
                }
              ],
              "ResultPath": "$.step2",
              "Next": "Terminate_Cluster_2"
            },
            "Fail_SNS_1": {
              "Type": "Task",
              "Resource": "arn:aws:states:::sns:publish",
              "Parameters": {
                "TopicArn": "arn:aws:sns:us-west-2:919490798061:rsac_error_notification",
                "Message.$": "$.err_mgs_1.Cause"
              },
              "ResultPath": "$.fail_cluster_1",
              "Next": "Terminate_Cluster_2"
            },
            "Terminate_Cluster_2": {
              "Type": "Task",
              "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
              "Parameters": {
                "ClusterId.$": "$.ClusterId"
              },
              "End": true
            }
          }
        }
      ],
      "ResultPath": "$.steps",
      "Next": "Terminate_Cluster"
    },
    "Terminate_Cluster": {
      "Type": "Task",
      "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
      "Parameters": {
        "ClusterId.$": "$.ClusterId"
      },
      "End": true
    }
  }
}

В этом сценарии (или) AWS Язык конечного автомата функции шага, при создании кластера я упомянул StepConcurrencyLevel как 2 и добавил 2 задания спарк как шаги ниже t кластер.

Когда я запустил этот сценарий в пошаговой функции, я смог организовать кластер и выполнить шаги для одновременного выполнения 2 шагов в кластере без прямой настройки его в консоли AWS EMR (или) через AWS CLI (или) даже через BOTO3.

Я просто использовал язык конечного автомата, чтобы выполнить оркестровку одновременного выполнения двух шагов в одном кластере в AWS Step Function без помощи других служб, таких как lambda или livy API или BOTO3 et c. ..

Вот как выглядит блок-схема: AWS Шаг Функция Рабочий процесс для одновременного выполнения шага

Чтобы быть более точным в том, куда я вставил StepConcurrencyLevel в выше State Machine Язык здесь:

"Create_A_Cluster": {
  "Type": "Task",
  "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
  "Parameters": {
    "Name": "WorkflowCluster",
    "StepConcurrencyLevel": 2,
    "Tags": [
      {
        "Key": "Description",
        "Value": "process"
      },

Под Create_A_Cluster .

Спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...