У меня есть очень простой скрипт Glue, который читает несколько файлов паркета в корзине s3 и пытается разделить его для более быстрых запросов, используя AWS Афина.
object GlueApp {
def main(sysArgs: Array[String]) {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
// @params: [JOB_NAME]
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
Job.init(args("JOB_NAME"), glueContext, args.asJava)
val dyf = glueContext.getSourceWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> "s3://parquet-not-partitioned/")), format = "parquet" ).getDynamicFrame()
var newDF = dyf.toDF()
newDF = newDF.withColumn("year", year(col("Test_time"))).withColumn("month", month(col("Test_time"))).withColumn("day", dayofmonth(col("Test_time")))
val partitioned_DF = newDF.repartition(col("year"), col("month"), col("day"), col("Test"))
val timestamped = DynamicFrame(partitioned_DF, glueContext)
val datasink4 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> "s3://parquet_partiotioned/", "partitionKeys" -> Seq("year", "month", "day","Device_Under_Test"))), transformationContext = "datasink4", format = "glueparquet", formatOptions = JsonOptions(Map("compression" -> "snappy", "blockSize" -> 134217728, "pageSize" -> 1048576 ))).writeDynamicFrame(timestamped)
Job.commit()
}
}
Обратите внимание: я перераспределяю на основе некоторые столбцы и повторное разбиение его во время записи в S3, чтобы количество создаваемых файлов было меньше.
Программа работает, если я пропущу шаг «перераспределения». Но это приводит к 70К количеству файлов размером 50К. Это делает запросы athena очень медленными.
С шагом репатриации я получаю ошибку, упомянутую ниже:
org.apache.spark.SparkException Job aborted due to stage failure:
Task 119 in stage 2.0 failed 4 times, most recent failure:
Lost task 119.3 in stage 2.0 (TID 2205, xx.xx-1.compute.internal, executor 89): ExecutorLostFailure (executor 89 exited caused by one of the running tasks) Reason:
Container killed by YARN for exceeding memory limits. 5.6 GB of 5.5 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.
Может кто-то поделиться какой-то инновационной идеей разбить и записать данные, не генерируя много файлы небольшого размера.