Как эффективно читать / анализировать загрузки файлов .gz в папке s3 с помощью spark на EMR - PullRequest
1 голос
/ 15 апреля 2019

Я пытаюсь прочитать все файлы в каталоге на s3 через приложение spark, которое выполняется на EMR.

Данные хранятся в типичном формате, например "s3a: //Some/path/yyyy/mm/dd/hh/blah.gz"

Если я использую подстановочные знаки с глубокими вложениями (например, «s3a: //SomeBucket/SomeFolder/////*.gz»), производительность ужасна и занимает около 40 минут, чтобы прочитать несколько десятков тысяч маленьких сжатых файлов JSON. Это работает, но потерять 40 минут на тестирование кода - это очень плохо.

У меня есть два других подхода, которые, как мне показали мои исследования, гораздо более эффективны.

Используя библиотеку hadoop.fs (2.8.5), я пытаюсь прочитать каждый путь к файлу, который я предоставляю.

private def getEventDataHadoop(
    eventsFilePaths: RDD[String]
  )(implicit sqlContext: SQLContext): Try[RDD[String]] =
    Try(
      {
        val conf = sqlContext.sparkContext.hadoopConfiguration

        eventsFilePaths.map(eventsFilePath => {
          val p                            = new Path(eventsFilePath)
          val fs                           = p.getFileSystem(conf)
          val eventData: FSDataInputStream = fs.open(p)
          IOUtils.toString(eventData)
        })
      }
    )

Эти пути к файлам генерируются с помощью следующего кода:

private[disneystreaming] def generateInputBucketPaths(
    s3Protocol: String,
    bucketName: String,
    service: String,
    region: String,
    yearsMonths: Map[String, Set[String]]
  ): Try[Set[String]] =
    Try(
      {
        val days                         = 1 to 31
        val hours                        = 0 to 23
        val dateFormatter: Int => String = buildDateFormat("00")

        yearsMonths.flatMap { yearMonth: (String, Set[String]) =>
          for {
            month: String <- yearMonth._2
            day: Int      <- days
            hour: Int     <- hours
          } yield
            s"$s3Protocol$bucketName/$service/$region/${dateFormatter(yearMonth._1.toInt)}/${dateFormatter(month.toInt)}/" +
              s"${dateFormatter(day)}/${dateFormatter(hour)}/*.gz"
        }.toSet
      }
    )

Код hadoop.fs завершается ошибкой, поскольку класс Path не сериализуем. Я не могу думать о том, как я могу обойти это.

Так что это привело меня к другому подходу с использованием AmazonS3Client, где я просто прошу клиента указать мне все пути к файлам в папке (или префиксе), а затем проанализировать файлы в строку, которая, скорее всего, не будет выполнена из-за того, что они сжатая:

 private def getEventDataS3(bucketName: String, prefix: String)(
    implicit sqlContext: SQLContext
  ): Try[RDD[String]] =
    Try(
      {
        import com.amazonaws.services.s3._, model._
        import scala.collection.JavaConverters._

        val request = new ListObjectsRequest()
        request.setBucketName(bucketName)
        request.setPrefix(prefix)
        request.setMaxKeys(Integer.MAX_VALUE)
        val s3 = new AmazonS3Client(new ProfileCredentialsProvider("default"))

        val objs: ObjectListing = s3.listObjects(request) // Note that this method returns truncated data if longer than the "pageLength" above. You might need to deal with that.
        sqlContext.sparkContext
          .parallelize(objs.getObjectSummaries.asScala.map(_.getKey).toList)
          .flatMap { key =>
            Source
              .fromInputStream(s3.getObject(bucketName, key).getObjectContent: InputStream)
              .getLines()
          }
      }
    )

Этот код создает исключение NULL, поскольку профиль не может иметь значение NULL («java.lang.IllegalArgumentException: файл профиля не может быть NULL»). Помните, что этот код работает на EMR в AWS, так как мне предоставить требуемые учетные данные? Как другие люди, выполняющие задания зажигания в EMR, используют этот клиент?

Любая помощь в работе любого из этих подходов очень ценится.

1 Ответ

0 голосов
/ 15 апреля 2019

Путь можно сериализировать в более поздних выпусках Hadoop, потому что его полезно использовать в RDD Spark.До этого преобразуйте путь в URI, выполните маршаллинг и создайте новый путь из этого URI внутри вашего замыкания.

...