Искра: java.io.NotSerializableException: com.amazonaws.services.s3.AmazonS3Client - PullRequest
0 голосов
/ 15 мая 2019

Я пытаюсь прочитать большое количество больших файлов из S3, что занимает много времени, если выполняется как функция Dataframe.Итак, после этого post и связанных с ним gist я пытаюсь использовать RDD для чтения объектов s3 параллельно, как показано ниже

def dfFromS3Objects(s3: AmazonS3, bucket: String, prefix: String, pageLength: Int = 1000) = {
    import com.amazonaws.services.s3._
    import model._
    import spark.sqlContext.implicits._

    import scala.collection.JavaConversions._

    val request = new ListObjectsRequest()
    request.setBucketName(bucket)
    request.setPrefix(prefix)
    request.setMaxKeys(pageLength)

    val objs: ObjectListing = s3.listObjects(request) // Note that this method returns truncated data if longer than the "pageLength" above. You might need to deal with that.

    spark.sparkContext.parallelize(objs.getObjectSummaries.map(_.getKey).toList)
      .flatMap { key => Source.fromInputStream(s3.getObject(bucket, key).getObjectContent: InputStream).getLines }.toDF()
  }

, который при тестировании заканчиваетсяс

Caused by: java.io.NotSerializableException: com.amazonaws.services.s3.AmazonS3Client
Serialization stack:
    - object not serializable (class: com.amazonaws.services.s3.AmazonS3Client, value: com.amazonaws.services.s3.AmazonS3Client@35c8be21)
    - field (class: de.smava.data.bards.anonymize.HistoricalBardAnonymization$$anonfun$dfFromS3Objects$2, name: s3$1, type: interface com.amazonaws.services.s3.AmazonS3)
    - object (class de.smava.data.bards.anonymize.HistoricalBardAnonymization$$anonfun$dfFromS3Objects$2, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
    ... 63 more

Я понимаю, что AmazonS3 объект, который я поставляю, должен быть отправлен исполнителям, следовательно, должен быть сериализуемым, но это из образца фрагмента, означающего, что кто-то заставил его работать, нужна помощь в вычислениичто мне здесь не хватает

1 Ответ

1 голос
/ 15 мая 2019

В сущности s3 определяется как метод, который создаст нового клиента для каждого вызова.Это не рекомендуется.Одним из способов решения этой проблемы является использование mapPartitions

spark
  .sparkContext
  .parallelize(objs.getObjectSummaries.map(_.getKey).toList)
  .mapPartitions { it =>
    val s3 = ... // init the client here
    it.flatMap { key => Source.fromInputStream(s3.getObject(bucket, key).getObjectContent: InputStream).getLines }
  }
  .toDF

Это все равно приведет к созданию нескольких клиентов на JVM, но, возможно, значительно меньше, чем версия, которая создает клиента для каждого объекта.Если вы хотите повторно использовать клиент между потоками внутри JVM, вы можете, например, обернуть его в объект верхнего уровня

object Foo {
  val s3 = ...
}

и использовать статическую конфигурацию для клиента.

...