Можно ли дождаться завершения кластера EMR? - PullRequest
2 голосов
/ 13 июня 2019

Я пытаюсь написать компонент, который запустит кластер EMR, запустит конвейер Spark на этом кластере, а затем завершит работу этого кластера после его завершения.

Я дошел до создания кластера и установки разрешений, позволяющих рабочим компьютерам моего основного кластера запускать кластеры EMR.Однако я борюсь с отладкой созданного кластера и жду, пока конвейер завершится.Вот код, который у меня есть сейчас.Заметьте, я использую Spark Scala, но это очень близко к стандартному Java-коду:

val runSparkJob = new StepConfig()
  .withName("Run Pipeline")
  .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
  .withHadoopJarStep(
    new HadoopJarStepConfig()
      .withJar("/path/to/jar")
      .withArgs(
        "spark-submit",
        "etc..."
      )
  )

// Create a cluster and run the Spark job on it
val clusterName = "REDACTED Cluster"
val createClusterRequest =
  new RunJobFlowRequest()
    .withName(clusterName)
    .withReleaseLabel(Configs.EMR_RELEASE_LABEL)
    .withSteps(enableDebugging, runSparkJob)
    .withApplications(new Application().withName("Spark"))
    .withLogUri(Configs.LOG_URI_PREFIX)
    .withServiceRole(Configs.SERVICE_ROLE)
    .withJobFlowRole(Configs.JOB_FLOW_ROLE)
    .withInstances(
      new JobFlowInstancesConfig()
        .withEc2SubnetId(Configs.SUBNET)
        .withInstanceCount(Configs.INSTANCE_COUNT)
        .withKeepJobFlowAliveWhenNoSteps(false)
        .withMasterInstanceType(Configs.MASTER_INSTANCE_TYPE)
        .withSlaveInstanceType(Configs.SLAVE_INSTANCE_TYPE)
    )

val newCluster = emr.runJobFlow(createClusterRequest)

У меня есть два конкретных вопроса:

  1. Звонок на emr.runJobFlow возвращается сразу после отправки результата.Можно ли как-нибудь блокировать его до тех пор, пока кластер не будет выключен, или иначе дождаться завершения рабочего процесса?

  2. Мой кластер фактически не запускается, и когда я перехожу к представлению AWS Console -> EMR -> Events, я вижу сбой:

    Amazon EMR Cluster j-XXX (REDACTED...) has terminated with errors at 2019-06-13 19:50 UTC with a reason of VALIDATION_ERROR.

Есть ли какой-нибудь способ, которым я могу получить эту ошибку программно в моем приложении Java / Scala?

1 Ответ

2 голосов
/ 14 июня 2019

Да, очень возможно дождаться завершения кластера EMR.

Существуют официанты , которые будут блокировать выполнение, пока кластер (т. Е. Поток заданий) не достигнет определенного уровня.state.

val newCluster = emr.runJobFlow(createClusterRequest);
val describeRequest = new DescribeClusterRequest()
    .withClusterId(newCluster.getClusterId())

// Wait until terminated
emr.waiters().clusterTerminated().run(new WaiterParameters(describeRequest))

Кроме того, если вы хотите получить состояние кластера (т. е. поток заданий), вы можете вызвать функцию descriptionCluster клиента EMR.Ознакомьтесь со связанной документацией, так как вы можете получить информацию о состоянии и состоянии кластера, чтобы определить, успешен он или нет.

val result = emr.describeCluster(describeRequest)

Примечание: не самый лучший Java-er, так что вышеизложенное - моя лучшая догадка и то, как он будет работать, основываясь на документации, но я не проверял выше.

...