Как мне убедиться, что мой установочный код Apache Spark запускается только один раз? - PullRequest
4 голосов
/ 04 июля 2019

Я пишу задание Spark в Scala, которое читает файлы паркета на S3, выполняет некоторые простые преобразования, а затем сохраняет их в экземпляре DynamoDB. Каждый раз, когда он запускается, нам нужно создавать новую таблицу в Dynamo, поэтому я написал лямбда-функцию, которая отвечает за создание таблицы. Первое, что делает мое задание Spark, - это генерирует имя таблицы, вызывает мою функцию Lambda (передает ей новое имя таблицы), ждет создания таблицы и затем продолжает работу с шагами ETL.

Однако похоже, что моя лямбда-функция последовательно вызывается дважды. Я не могу этого объяснить. Вот пример кода:

def main(spark: SparkSession, pathToParquet: String) {

  // generate a unique table name
  val tableName = generateTableName()

  // call the lambda function
  val result = callLambdaFunction(tableName)

  // wait for the table to be created
  waitForTableCreation(tableName)

  // normal ETL pipeline
  var parquetRDD = spark.read.parquet(pathToParquet)
  val transformedRDD = parquetRDD.map((row: Row) => transformData(row), encoder=kryo[(Text, DynamoDBItemWritable)])
  transformedRDD.saveAsHadoopDataset(getConfiguration(tableName))
  spark.sparkContext.stop()
}

Код для ожидания создания таблицы довольно прост, как вы можете видеть:

def waitForTableCreation(tableName: String) {
  val client: AmazonDynamoDB = AmazonDynamoDBClientBuilder.defaultClient()
  val waiter: Waiter[DescribeTableRequest] = client.waiters().tableExists()
  try {
    waiter.run(new WaiterParameters[DescribeTableRequest](new DescribeTableRequest(tableName)))
  } catch {
      case ex: WaiterTimedOutException =>
        LOGGER.error("Timed out waiting to create table: " + tableName)
        throw ex
      case t: Throwable => throw t
  }
}

И лямбда-вызов одинаково прост:

def callLambdaFunction(tableName: String) {
  val myLambda = LambdaInvokerFactory.builder()
    .lambdaClient(AWSLambdaClientBuilder.defaultClient)
    .lambdaFunctionNameResolver(new LambdaByName(LAMBDA_FUNCTION_NAME))
    .build(classOf[MyLambdaContract])
  myLambda.invoke(new MyLambdaInput(tableName))
}

Как я уже сказал, когда я запускаю spark-submit для этого кода, он определенно попадает в функцию Lambda. Но я не могу объяснить, почему это поражает его дважды. В результате я получаю две таблицы, подготовленные в DynamoDB.

Шаг ожидания также кажется неудачным в контексте выполнения этого задания Spark. Но когда я тестирую мой код ожидания, он, кажется, работает нормально сам по себе. Он успешно блокируется, пока стол не будет готов.

Сначала я предположил, что, возможно, spark-submit отправлял этот код всем рабочим узлам, и они независимо выполняли все это. Изначально у меня был кластер Spark с 1 мастером и 2 рабочими. Однако я проверил это на другом кластере с 1 мастером и 5 рабочими, и там он снова дважды активировал функцию Lambda, а затем, по-видимому, не смог дождаться создания таблицы, поскольку он умирает вскоре после вызова Lambdas.

Кто-нибудь знает, что может делать Спарк? Я что-то упускаю из виду?

ОБНОВЛЕНИЕ: Вот мои спаржи-подающие аргументы, которые видны на вкладке Шаги EMR.

spark-submit --deploy-mode cluster --class com.mypackage.spark.MyMainClass s3: //my-bucket/my-spark-job.jar

А вот код для моей getConfiguration функции:

def getConfiguration(tableName: String) : JobConf = {
  val conf = new Configuration()
  conf.set("dynamodb.servicename", "dynamodb")
  conf.set("dynamodb.input.tableName", tableName)
  conf.set("dynamodb.output.tableName", tableName)
  conf.set("dynamodb.endpoint", "https://dynamodb.us-east-1.amazonaws.com")
  conf.set("dynamodb.regionid", "us-east-1")
  conf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
  conf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
  new JobConf(conf)
}

Также здесь есть Gist , содержащий некоторые журналы исключений, которые я вижу, когда пытаюсь запустить это.

Ответы [ 3 ]

3 голосов
/ 10 июля 2019

Я столкнулся с той же проблемой в режиме кластера (v2.4.0). Я обошел его, запустив мои приложения программно, используя SparkLauncher вместо использования spark-submit.sh. Вы можете переместить свою лямбда-логику в основной метод, который запускает ваше искровое приложение, например:

def main(args: Array[String]) = {
    // generate a unique table name
    val tableName = generateTableName()

    // call the lambda function
    val result = callLambdaFunction(tableName)

    // wait for the table to be created
    waitForTableCreation(tableName)

    val latch = new CountDownLatch(1);

    val handle = new SparkLauncher(env)
        .setAppResource("/path/to/spark-app.jar")
        .setMainClass("com.company.SparkApp")
        .setMaster("yarn")
        .setDeployMode("cluster")
        .setConf("spark.executor.instances", "2")
        .setConf("spark.executor.cores", "2")
        // other conf ... 
        .setVerbose(true)
        .startApplication(new SparkAppHandle.Listener {
            override def stateChanged(sparkAppHandle: SparkAppHandle): Unit = {
                latch.countDown()
            }

            override def infoChanged(sparkAppHandle: SparkAppHandle): Unit = {

            }
        })  

    println("app is launching...")
    latch.await()
    println("app exited")
}
3 голосов
/ 09 июля 2019

Спасибо @soapergem за добавление регистрации и опций.Я добавляю ответ (пробный), так как он может быть немного длиннее комментария:)

Подводя итог:

  • ничего странного с spark-submit и конфигурациейпараметры
  • в https://gist.github.com/soapergem/6b379b5a9092dcd43777bdec8dee65a8#file-stderr-log вы можете видеть, что приложение выполняется дважды.Он дважды переходит из состояния ПРИНЯТО в РАБОТУ.И это соответствует настройкам по умолчанию EMR ( Как предотвратить повторение шага EMR Spark? ).Чтобы убедиться в этом, вы можете проверить, есть ли у вас 2 таблицы, созданные после выполнения шага (я предполагаю, что здесь вы генерируете таблицы с динамическими именами; другое имя для выполнения, которое в случае повторной попытки должно давать 2 разных имени)

Ваш последний вопрос:

Похоже, мой код мог бы работать, если бы я запускал его в режиме развертывания "клиент", а не в режиме развертывания "кластер"?Предлагает ли это кому-нибудь подсказки здесь?

Для получения дополнительной информации о разнице, пожалуйста, проверьте https://community.hortonworks.com/questions/89263/difference-between-local-vs-yarn-cluster-vs-yarn-c.html В вашем случае, похоже, что машина, выполняющая spark-submit в режиме клиента, имеетдругие политики IAM, чем поток работ EMR.Здесь я предполагаю, что вашей роли в рабочем процессе не разрешено dynamodb:Describe*, и поэтому вы получаете исключение с 500 code (из вашей сущности):

Caused by: com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException: Requested resource not found: Table: EmrTest_20190708143902 not found (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: V0M91J7KEUVR4VM78MF5TKHLEBVV4KQNSO5AEMVJF66Q9ASUAAJG)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:4243)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:4210)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.executeDescribeTable(AmazonDynamoDBClient.java:1890)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.describeTable(AmazonDynamoDBClient.java:1857)
    at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:129)
    at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:126)
at org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.runWithRetry(DynamoDBFibonacciRetryer.java:80)

Чтобы подтвердить эту гипотезу, вывыполните свою часть, создав таблицу и ожидая создания локально (здесь нет кода Spark, просто простая команда java вашей основной функции) и:

  • для первого выполнения убедитесь, что у вас есть все разрешения,IMO это будет dynamodb:Describe* на Resources: * (если это причина, AFAIK, вы должны использовать что-то Resources: Test_Emr* в производстве по принципу наименьших привилегий)
  • для 2-го выполнения удалите dynamodb:Describe* и проверьте,вы получаете ту же трассировку стека, что и в Gist
2 голосов
/ 13 июля 2019

ваше задание на искру начинается до того, как таблица на самом деле создана, потому что определение операций по одному не означает, что они будут ждать до завершения предыдущего

вам нужно изменить код, чтобы блок, связанный с искрой, былначиная с создания таблицы, и для ее достижения вы должны либо использовать for-comprehension, обеспечивающий завершение каждого шага, либо поместить свой искровой конвейер в обратный вызов waiter, вызываемый после создания таблицы (если у вас есть,трудно сказать)

вы также можете использовать andThen или просто map

главное, что все строки кода, написанные в вашем основном, выполняются одна за другой немедленно не дожидаясь окончания предыдущего

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...