Как сохранить BufferedImage RDD в виде файла HDFS - PullRequest
0 голосов
/ 04 июля 2019

У меня есть требование прочитать изображение из HDFS, выполнить некоторую обработку и сохранить изображение обратно в HDFS. Эта обработка должна быть сделана в искре. Я читаю файлы изображений как sc.binaryFiles, а затем преобразую их в буферизованные изображения и выполняю некоторые операции. Но при попытке сохранить RDD [BufferedImage] в FSDataOutputStream

появляется ошибка «Задача не сериализуема».
    //read binary files from RDD
    val images = sc.binaryFiles("/tmp/images/")
    //images: org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)] = /tmp/images/ 

    //get BufferedImageRDD
    val bufImages = images.map(x => ImageIO.read(x._2.open))
    //bufImages: org.apache.spark.rdd.RDD[java.awt.image.BufferedImage] = MapPartitionsRDD[1]


    //try saving in local directory
    bufImages.foreach(x => UtilImageIO.saveImage(x,"Mean3.jpg"))
    //success



    //try saving in hdfs

    val conf = new Configuration()
    val fileSystem = FileSystem.get(conf);
    val out = fileSystem.create(new Path("/tmp/img1.png"));
    //out: org.apache.hadoop.fs.FSDataOutputStream = org.apache.hadoop.hdfs.client.HdfsDataOutputStream@440f55ad


    bufImages.foreach(x => ImageIO.write(x,"png", out))

Приведенный выше код выдает следующую ошибку

    org.apache.spark.SparkException: Task not serializable
      at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
      at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
      at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
      at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:926)
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:925)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
      at org.apache.spark.rdd.RDD.foreach(RDD.scala:925)
      ... 49 elided
    Caused by: java.io.NotSerializableException: org.apache.hadoop.hdfs.client.HdfsDataOutputStream
    Serialization stack:
        - object not serializable (class: org.apache.hadoop.hdfs.client.HdfsDataOutputStream, value: org.apache.hadoop.hdfs.client.HdfsDataOutputStream@440f55ad)
        - field (class: $iw, name: out, type: class org.apache.hadoop.fs.FSDataOutputStream)
        - object (class $iw, $iw@13c2b782)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@28aedf6e)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@14d0c3ff)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@48eb05e9)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@6b9ba1a6)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@53d519cb)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@45d7e92)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@79c1301b)
        - field (class: $line49.$read, name: $iw, type: class $iw)
        - object (class $line49.$read, $line49.$read@1a714d1)
        - field (class: $iw, name: $line49$read, type: class $line49.$read)
        - object (class $iw, $iw@79ef07b3)
        - field (class: $iw, name: $outer, type: class $iw)
        - object (class $iw, $iw@2dd246ff)
        - field (class: $anonfun$1, name: $outer, type: class $iw)
        - object (class $anonfun$1, <function1>)
      at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
      at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
      at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
      at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
      ... 58 more

Пожалуйста, дайте мне знать, если есть какой-то конкретный способ, которым это может быть достигнуто.

1 Ответ

0 голосов
/ 04 июля 2019

Метод foreach на rdd требует только сериализации аргументов. Так что, просто написав оболочку для ImageIO.write (x, "png", out) с сериализуемым аргументом, я смог сделать эту работу.

...