На самом деле, PortableDataStream
является Сериализуемым. Вот для чего он предназначен. Тем не менее, open()
возвращает простое DataInputStream
(HdfsDataInputStream
в вашем случае, потому что ваш файл находится в HDFS), которое не Serializable, следовательно, вы получаете ошибку.
На самом деле, когда вы открываете PortableDataStream
, вам просто нужно сразу прочитать данные. В Scala вы можете использовать scala.io.Source.fromInputStream
:
val data : RDD[Array[String]] = sc
.binaryFiles("path/.../")
.map{ case (fileName, pds) => {
scala.io.Source.fromInputStream(pds.open())
.getLines().toArray
}}
Этот код предполагает, что данные являются текстовыми. Если это не так, вы можете адаптировать его для чтения любых двоичных данных. Вот пример для создания последовательности байтов, которую вы можете обработать так, как вы хотите.
val rdd : RDD[Seq[Byte]] = sc.binaryFiles("...")
.map{ case (file, pds) => {
val dis = pds.open()
val bytes = Array.ofDim[Byte](1024)
val all = scala.collection.mutable.ArrayBuffer[Byte]()
while( dis.read(bytes) != -1) {
all ++= bytes
}
all.toSeq
}}
См. javadoc из DataInputStream
для получения дополнительных возможностей. Например, он обладает readLong
, readDouble
(и т. Д.) Методами.