AWS клей - создание секционированных данных в корзине S3 - PullRequest
0 голосов
/ 02 марта 2020

У меня есть очень простой скрипт Glue, который читает несколько файлов паркета в корзине s3 и пытается разделить его для более быстрых запросов, используя AWS Афина.

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    val dyf = glueContext.getSourceWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> "s3://parquet-not-partitioned/")), format = "parquet" ).getDynamicFrame()
    var newDF = dyf.toDF()
    newDF = newDF.withColumn("year", year(col("Test_time"))).withColumn("month", month(col("Test_time"))).withColumn("day", dayofmonth(col("Test_time")))
    val partitioned_DF =  newDF.repartition(col("year"), col("month"), col("day"), col("Test"))
    val timestamped = DynamicFrame(partitioned_DF, glueContext)
    val datasink4 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> "s3://parquet_partiotioned/", "partitionKeys" -> Seq("year", "month", "day","Device_Under_Test"))), transformationContext = "datasink4", format = "glueparquet", formatOptions = JsonOptions(Map("compression" -> "snappy", "blockSize" -> 134217728, "pageSize" -> 1048576 ))).writeDynamicFrame(timestamped)
    Job.commit()
  }
}

Обратите внимание: я перераспределяю на основе некоторые столбцы и повторное разбиение его во время записи в S3, чтобы количество создаваемых файлов было меньше.

Программа работает, если я пропущу шаг «перераспределения». Но это приводит к 70К количеству файлов размером 50К. Это делает запросы athena очень медленными.

С шагом репатриации я получаю ошибку, упомянутую ниже:

org.apache.spark.SparkException Job aborted due to stage failure:
Task 119 in stage 2.0 failed 4 times, most recent failure:
Lost task 119.3 in stage 2.0 (TID 2205, xx.xx-1.compute.internal, executor 89): ExecutorLostFailure (executor 89 exited caused by one of the running tasks) Reason:
Container killed by YARN for exceeding memory limits. 5.6 GB of 5.5 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.

Может кто-то поделиться какой-то инновационной идеей разбить и записать данные, не генерируя много файлы небольшого размера.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...