У меня есть небольшой вспомогательный метод metadata
, вы можете напрямую вызывать объект DataFrame, например df.metadata
, он создаст DataFrame для доступных метаданных и вернет DataFrame обратно.
Метатолбины в окончательном варианте DataFrame
- путь
- isDirectory
- длина - будет отображаться в удобочитаемом формате 47 байтов
- репликация
- blockSize - будет отображаться в удобочитаемом формате 47 байтов.
- ModificationTime - будет преобразовано из unix времени в обычное время даты. *
- группа
- разрешение
- isSymlink
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.joda.time.DateTime
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.DataFrame
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
// Storing Metadata
case class FileMetaData(path: String,
isDirectory:Boolean,
length:String,
replication:Int,
blockSize:String,
modificationTime: String,
accessTime:String ,
owner:String ,
group:String ,
permission:String,
isSymlink:Boolean)
object FileMetaData {
def apply(lfs: LocatedFileStatus):FileMetaData = {
new FileMetaData(
path= lfs.getPath.toString,
isDirectory=lfs.isDirectory,
length=FileUtils.byteCountToDisplaySize(lfs.getLen),
replication=lfs.getReplication,
blockSize=FileUtils.byteCountToDisplaySize(lfs.getBlockSize),
modificationTime=new DateTime(lfs.getModificationTime).toString,
accessTime=new DateTime(lfs.getAccessTime).toString ,
owner=lfs.getOwner ,
group=lfs.getGroup ,
permission=lfs.getPermission.toString,
isSymlink=lfs.isSymlink
)
}
}
// Convert RemoteIterator to Scala Iterator.
implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
override def hasNext: Boolean = remoteIterator.hasNext
override def next(): T = remoteIterator.next()
}
wrapper(remoteIterator)
}
// Using this we can call metadata method on df - like df.metadata.
implicit class MetaData(df: DataFrame) {
def metadata = {
import df.sparkSession.implicits._
df.inputFiles.map(new Path(_))
.flatMap{
FileSystem
.get(df.sparkSession.sparkContext.hadoopConfiguration)
.listLocatedStatus(_)
.toList
}
.map(FileMetaData(_)).toList.toDF
}
}
// Exiting paste mode, now interpreting.
warning: there was one feature warning; re-run with -feature for details
import org.joda.time.DateTime
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.DataFrame
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
defined class FileMetaData
defined object FileMetaData
convertToScalaIterator: [T](remoteIterator: org.apache.hadoop.fs.RemoteIterator[T])Iterator[T]
defined class MetaData
scala> val df = spark.read.format("json").load("/tmp/data")
df: org.apache.spark.sql.DataFrame = [json_data: struct<value: string>]
scala> df.show(false)
+------------------+
|json_data |
+------------------+
|[{"a":1} ,{"b":2}]|
|[{"a":1} ,{"b":2}]|
|[{"a":1} ,{"b":2}]|
+------------------+
scala>
Вывод метаданных DataFrame
scala> df.metadata.show(false)
+-------------------------+-----------+--------+-----------+---------+-----------------------------+-----------------------------+--------+-----+----------+---------+
|path |isDirectory|length |replication|blockSize|modificationTime |accessTime |owner |group|permission|isSymlink|
+-------------------------+-----------+--------+-----------+---------+-----------------------------+-----------------------------+--------+-----+----------+---------+
|file:/tmp/data/fileB.json|false |47 bytes|1 |32 MB |2020-04-25T13:47:00.000+05:30|1970-01-01T05:30:00.000+05:30|srinivas|wheel|rw-r--r-- |false |
|file:/tmp/data/fileC.json|false |47 bytes|1 |32 MB |2020-04-25T13:47:10.000+05:30|1970-01-01T05:30:00.000+05:30|srinivas|wheel|rw-r--r-- |false |
|file:/tmp/data/fileA.json|false |47 bytes|1 |32 MB |2020-04-25T11:35:12.000+05:30|1970-01-01T05:30:00.000+05:30|srinivas|wheel|rw-r--r-- |false |
+-------------------------+-----------+--------+-----------+---------+-----------------------------+-----------------------------+--------+-----+----------+---------+