Scala: как получить содержимое экземпляра PortableDataStream из RDD - PullRequest
0 голосов
/ 27 марта 2019

Поскольку я хочу извлечь данные из двоичных файлов, я читаю файлы, используя val dataRDD = sc.binaryRecord("Path") Я получаю результат как org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)]

Я хочу извлечь содержимое моих файлов, которое имеет форму PortableDataStream

Для этого я пытался: val data = dataRDD.map(x => x._2.open()).collect(), но я получаю следующую ошибку: java.io.NotSerializableException:org.apache.hadoop.hdfs.client.HdfsDataInputStream

Если у вас есть идея, как мне решить мою проблему, ПОМОГИТЕ!

Большое спасибо заранее.

1 Ответ

0 голосов
/ 27 марта 2019

На самом деле, PortableDataStream является Сериализуемым. Вот для чего он предназначен. Тем не менее, open() возвращает простое DataInputStream (HdfsDataInputStream в вашем случае, потому что ваш файл находится в HDFS), которое не Serializable, следовательно, вы получаете ошибку.

На самом деле, когда вы открываете PortableDataStream, вам просто нужно сразу прочитать данные. В Scala вы можете использовать scala.io.Source.fromInputStream:

val data : RDD[Array[String]] = sc
    .binaryFiles("path/.../")
    .map{ case (fileName, pds) => {
        scala.io.Source.fromInputStream(pds.open())
            .getLines().toArray
    }}

Этот код предполагает, что данные являются текстовыми. Если это не так, вы можете адаптировать его для чтения любых двоичных данных. Вот пример для создания последовательности байтов, которую вы можете обработать так, как вы хотите.

val rdd : RDD[Seq[Byte]] = sc.binaryFiles("...")
    .map{ case (file, pds) => {
        val dis = pds.open()
        val bytes = Array.ofDim[Byte](1024)
        val all = scala.collection.mutable.ArrayBuffer[Byte]()
        while( dis.read(bytes) != -1) {
            all ++= bytes
        }
        all.toSeq
    }}

См. javadoc из DataInputStream для получения дополнительных возможностей. Например, он обладает readLong, readDouble (и т. Д.) Методами.

...