Я пишу задание Spark в Scala, которое читает файлы паркета на S3, выполняет некоторые простые преобразования, а затем сохраняет их в экземпляре DynamoDB. Каждый раз, когда он запускается, нам нужно создавать новую таблицу в Dynamo, поэтому я написал лямбда-функцию, которая отвечает за создание таблицы. Первое, что делает мое задание Spark, - это генерирует имя таблицы, вызывает мою функцию Lambda (передает ей новое имя таблицы), ждет создания таблицы и затем продолжает работу с шагами ETL.
Однако похоже, что моя лямбда-функция последовательно вызывается дважды. Я не могу этого объяснить. Вот пример кода:
def main(spark: SparkSession, pathToParquet: String) {
// generate a unique table name
val tableName = generateTableName()
// call the lambda function
val result = callLambdaFunction(tableName)
// wait for the table to be created
waitForTableCreation(tableName)
// normal ETL pipeline
var parquetRDD = spark.read.parquet(pathToParquet)
val transformedRDD = parquetRDD.map((row: Row) => transformData(row), encoder=kryo[(Text, DynamoDBItemWritable)])
transformedRDD.saveAsHadoopDataset(getConfiguration(tableName))
spark.sparkContext.stop()
}
Код для ожидания создания таблицы довольно прост, как вы можете видеть:
def waitForTableCreation(tableName: String) {
val client: AmazonDynamoDB = AmazonDynamoDBClientBuilder.defaultClient()
val waiter: Waiter[DescribeTableRequest] = client.waiters().tableExists()
try {
waiter.run(new WaiterParameters[DescribeTableRequest](new DescribeTableRequest(tableName)))
} catch {
case ex: WaiterTimedOutException =>
LOGGER.error("Timed out waiting to create table: " + tableName)
throw ex
case t: Throwable => throw t
}
}
И лямбда-вызов одинаково прост:
def callLambdaFunction(tableName: String) {
val myLambda = LambdaInvokerFactory.builder()
.lambdaClient(AWSLambdaClientBuilder.defaultClient)
.lambdaFunctionNameResolver(new LambdaByName(LAMBDA_FUNCTION_NAME))
.build(classOf[MyLambdaContract])
myLambda.invoke(new MyLambdaInput(tableName))
}
Как я уже сказал, когда я запускаю spark-submit
для этого кода, он определенно попадает в функцию Lambda. Но я не могу объяснить, почему это поражает его дважды. В результате я получаю две таблицы, подготовленные в DynamoDB.
Шаг ожидания также кажется неудачным в контексте выполнения этого задания Spark. Но когда я тестирую мой код ожидания, он, кажется, работает нормально сам по себе. Он успешно блокируется, пока стол не будет готов.
Сначала я предположил, что, возможно, spark-submit
отправлял этот код всем рабочим узлам, и они независимо выполняли все это. Изначально у меня был кластер Spark с 1 мастером и 2 рабочими. Однако я проверил это на другом кластере с 1 мастером и 5 рабочими, и там он снова дважды активировал функцию Lambda, а затем, по-видимому, не смог дождаться создания таблицы, поскольку он умирает вскоре после вызова Lambdas.
Кто-нибудь знает, что может делать Спарк? Я что-то упускаю из виду?
ОБНОВЛЕНИЕ: Вот мои спаржи-подающие аргументы, которые видны на вкладке Шаги EMR.
spark-submit --deploy-mode cluster --class com.mypackage.spark.MyMainClass s3: //my-bucket/my-spark-job.jar
А вот код для моей getConfiguration
функции:
def getConfiguration(tableName: String) : JobConf = {
val conf = new Configuration()
conf.set("dynamodb.servicename", "dynamodb")
conf.set("dynamodb.input.tableName", tableName)
conf.set("dynamodb.output.tableName", tableName)
conf.set("dynamodb.endpoint", "https://dynamodb.us-east-1.amazonaws.com")
conf.set("dynamodb.regionid", "us-east-1")
conf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
conf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
new JobConf(conf)
}
Также здесь есть Gist , содержащий некоторые журналы исключений, которые я вижу, когда пытаюсь запустить это.