Как получить метаданные файлов при извлечении данных из HDFS? - PullRequest
1 голос
/ 20 апреля 2020

Я запросил данные из HDFS и хотел бы получить метаданные файлов, из которых они были прочитаны. Это позволит мне создавать отчеты, которые будут выглядеть на основе доступных данных на данный момент.

Я нашел решение, которое заключается в использовании org.apache.hadoop.fs.FileSystem для получения списка всех файлов. Я знаю правило разбиения и могу построить отображение row -> meta на основе полученного списка.

Но это решение кажется трудным для реализации и поддержки. Может быть, есть более простые способы достижения того же результата?

Ответы [ 3 ]

2 голосов
/ 25 апреля 2020

У меня есть небольшой вспомогательный метод metadata, вы можете напрямую вызывать объект DataFrame, например df.metadata, он создаст DataFrame для доступных метаданных и вернет DataFrame обратно.

Метатолбины в окончательном варианте DataFrame

  • путь
  • isDirectory
  • длина - будет отображаться в удобочитаемом формате 47 байтов
  • репликация
  • blockSize - будет отображаться в удобочитаемом формате 47 байтов.
  • ModificationTime - будет преобразовано из unix времени в обычное время даты. *
  • группа
  • разрешение
  • isSymlink

scala> :paste
// Entering paste mode (ctrl-D to finish)

  import org.joda.time.DateTime
  import org.apache.commons.io.FileUtils
  import org.apache.spark.sql.DataFrame
  import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}

  // Storing Metadata
  case class FileMetaData(path: String,
                          isDirectory:Boolean,
                          length:String,
                          replication:Int,
                          blockSize:String,
                          modificationTime: String,
                          accessTime:String ,
                          owner:String ,
                          group:String ,
                          permission:String,
                          isSymlink:Boolean)

  object FileMetaData {

    def apply(lfs: LocatedFileStatus):FileMetaData = {        
      new FileMetaData(
        path= lfs.getPath.toString,
        isDirectory=lfs.isDirectory,
        length=FileUtils.byteCountToDisplaySize(lfs.getLen),
        replication=lfs.getReplication,
        blockSize=FileUtils.byteCountToDisplaySize(lfs.getBlockSize),
        modificationTime=new DateTime(lfs.getModificationTime).toString,
        accessTime=new DateTime(lfs.getAccessTime).toString ,
        owner=lfs.getOwner ,
        group=lfs.getGroup ,
        permission=lfs.getPermission.toString,
        isSymlink=lfs.isSymlink
      )
    }
  }

  // Convert RemoteIterator to Scala Iterator.
  implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
    case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
      override def hasNext: Boolean = remoteIterator.hasNext
      override def next(): T = remoteIterator.next()
    }
    wrapper(remoteIterator)
  }

  // Using this we can call metadata method on df - like df.metadata.
  implicit class MetaData(df: DataFrame) {
    def metadata =  {
      import df.sparkSession.implicits._
       df.inputFiles.map(new Path(_))
         .flatMap{
           FileSystem
             .get(df.sparkSession.sparkContext.hadoopConfiguration)
             .listLocatedStatus(_)
             .toList
       }
         .map(FileMetaData(_)).toList.toDF
     }
  }

// Exiting paste mode, now interpreting.

warning: there was one feature warning; re-run with -feature for details
import org.joda.time.DateTime
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.DataFrame
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
defined class FileMetaData
defined object FileMetaData
convertToScalaIterator: [T](remoteIterator: org.apache.hadoop.fs.RemoteIterator[T])Iterator[T]
defined class MetaData

scala> val df = spark.read.format("json").load("/tmp/data")
df: org.apache.spark.sql.DataFrame = [json_data: struct<value: string>]



scala> df.show(false)
+------------------+
|json_data         |
+------------------+
|[{"a":1} ,{"b":2}]|
|[{"a":1} ,{"b":2}]|
|[{"a":1} ,{"b":2}]|
+------------------+

scala>

Вывод метаданных DataFrame

scala> df.metadata.show(false)

+-------------------------+-----------+--------+-----------+---------+-----------------------------+-----------------------------+--------+-----+----------+---------+
|path                     |isDirectory|length  |replication|blockSize|modificationTime             |accessTime                   |owner   |group|permission|isSymlink|
+-------------------------+-----------+--------+-----------+---------+-----------------------------+-----------------------------+--------+-----+----------+---------+
|file:/tmp/data/fileB.json|false      |47 bytes|1          |32 MB    |2020-04-25T13:47:00.000+05:30|1970-01-01T05:30:00.000+05:30|srinivas|wheel|rw-r--r-- |false    |
|file:/tmp/data/fileC.json|false      |47 bytes|1          |32 MB    |2020-04-25T13:47:10.000+05:30|1970-01-01T05:30:00.000+05:30|srinivas|wheel|rw-r--r-- |false    |
|file:/tmp/data/fileA.json|false      |47 bytes|1          |32 MB    |2020-04-25T11:35:12.000+05:30|1970-01-01T05:30:00.000+05:30|srinivas|wheel|rw-r--r-- |false    |
+-------------------------+-----------+--------+-----------+---------+-----------------------------+-----------------------------+--------+-----+----------+---------+
1 голос
/ 23 апреля 2020

Самый простой способ сделать это с помощью spark udf input_file_name.

import scala.collection.mutable.Map
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

val df = spark.read.text("<path>").withColumn("input_file_name", input_file_name()).repartition($"input_file_name")

def getMetadata(rdd: Iterator[Row]) = {
    val map = Map[String, Long]()
    val fs = FileSystem.get(new Configuration())
    rdd.map(row => {
                val path = row.getString(row.size -1)
                if(! map.contains(path)){
                    map.put(path,fs.listStatus(new Path(path))(0).getModificationTime())
                }
                Row.fromSeq(row.toSeq ++ Array[Any](map(path)))
            })
}

spark.createDataFrame(df.rdd.mapPartitions(getMetadata),df.schema.add("modified_ts", LongType)).show(10000,false)

Здесь modified_ts - это mtime для файла.

В зависимости от размера данных, вы также можете сделать это с помощью соединения. Логика c будет выглядеть примерно так:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.functions._

val mtime =(path:String)=> FileSystem.get(new Configuration()).listStatus(new Path(path)).head.getModificationTime
val mtimeUDF = udf(mtime)

val df = spark.read.text("<path>").withColumn("input_file_name", input_file_name())

val metadata_df = df.select($"input_file_name").distinct().withColumn("mtime", mtimeUDF($"input_file_name"))

val rows_with_metadata = df.join(metadata_df , "input_file_name")
rows_with_metadata.show(false)
0 голосов
/ 22 апреля 2020

Я могу сделать предположение, попробуйте отладить код:

hdfs debug computeMeta -block <block-file> -out <output-metadata-file>

вы можете найти вашу команду любезно посетите https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HDFSCommands.html#verifyMeta

...