Note
Вызов getModificationTime
для каждой строки вашего DataFrame будет влиять на производительность.
Изменен ваш код для одноразовой выборки метаданных файла и сохранен в files:Map[String,Long]
, создан UDF input_file_modification_time
для выборки данные с карты [String, Long].
Пожалуйста, проверьте код ниже.
scala> val df = spark.read.format("parquet").load("/tmp/par")
df: org.apache.spark.sql.DataFrame = [id: int]
scala> :paste
// Entering paste mode (ctrl-D to finish)
def getModificationTime(path: String): Long = {
FileSystem.get(spark.sparkContext.hadoopConfiguration)
.getFileStatus(new org.apache.hadoop.fs.Path(path))
.getModificationTime()
}
// Exiting paste mode, now interpreting.
getModificationTime: (path: String)Long
scala> implicit val files = df.inputFiles.flatMap(name => Map(name -> getModificationTime(name))).toMap
files: scala.collection.immutable.Map[String,Long] = Map(file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000_2.snappy.parquet -> 1588080295000, file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000_3.snappy.parquet -> 1588080299000, file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000_4.snappy.parquet -> 1588080302000, file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000.snappy.parquet -> 1588071322000)
scala> :paste
// Entering paste mode (ctrl-D to finish)
def getTime(fileName:String)(implicit files: Map[String,Long]): Long = {
files.getOrElse(fileName,0L)
}
// Exiting paste mode, now interpreting.
getTime: (fileName: String)(implicit files: Map[String,Long])Long
scala> val input_file_modification_time = udf(getTime _)
input_file_modification_time: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(StringType)))
scala> df.withColumn("createdDate",input_file_modification_time(input_file_name)).show
+---+-------------+
| id| createdDate|
+---+-------------+
| 1|1588080295000|
| 2|1588080295000|
| 3|1588080295000|
| 4|1588080295000|
| 5|1588080295000|
| 6|1588080295000|
| 7|1588080295000|
| 8|1588080295000|
| 9|1588080295000|
| 10|1588080295000|
| 11|1588080295000|
| 12|1588080295000|
| 13|1588080295000|
| 14|1588080295000|
| 15|1588080295000|
| 16|1588080295000|
| 17|1588080295000|
| 18|1588080295000|
| 19|1588080295000|
| 20|1588080295000|
+---+-------------+
only showing top 20 rows
scala>