Как вызвать FileSystem из udf - PullRequest
0 голосов
/ 28 апреля 2020

То, что я ожидал

Цель состоит в том, чтобы добавить столбец с временем модификации к каждой DataFrame строке.

Учитывая

val data = spark.read.parquet("path").withColumn("input_file_name", input_file_name())

+----+------------------------+
| id |        input_file_name |
+----+------------------------+
|  1 | hdfs://path/part-00001 |
|  2 | hdfs://path/part-00001 |
|  3 | hdfs://path/part-00002 |
+----+------------------------+

Ожидается

+----+------------------------+
| id |      modification_time |
+----+------------------------+
|  1 | 2000-01-01Z00:00+00:00 |
|  2 | 2000-01-01Z00:00+00:00 |
|  3 | 2000-01-02Z00:00+00:00 |
+----+------------------------+

Что я пытался

Я написал функцию для получения времени модификации

def getModificationTime(path: String): Long = {
    FileSystem.get(spark.sparkContext.hadoopConfiguration)
        .getFileStatus(new org.apache.hadoop.fs.Path(path))
        .getModificationTime()
}

val modificationTime = getModificationTime("hdfs://srsdev/projects/khajiit/data/OfdCheques2/date=2020.02.01/part-00002-04b9e4c8-5916-4bb2-b9ff-757f843a0142.c000.snappy.parquet")

ificationTime: Long = 1580708401253

. .. но это не работает в запросе

def input_file_modification_time = udf((path: String) => getModificationTime(path))

data.select(input_file_modification_time($"input_file_name") as "modification_time").show(20, false)

org. apache .spark.SparkException: задание прервано из-за сбоя этапа: задание 0 на этапе 54.0 не выполнено 4 раза, последний сбой : Потерянное задание 0.3 на этапе 54.0 (TID 408, srs-hdp-s1.dev.kontur.ru, исполнитель 3): org. apache .spark.SparkException: не удалось выполнить пользовательскую функцию ($ anonfun $ input_file_modification_time $ 1: (строка) => bigint)

Ответы [ 2 ]

1 голос
/ 28 апреля 2020

Note Вызов getModificationTime для каждой строки вашего DataFrame будет влиять на производительность.

Изменен ваш код для одноразовой выборки метаданных файла и сохранен в files:Map[String,Long], создан UDF input_file_modification_time для выборки данные с карты [String, Long].

Пожалуйста, проверьте код ниже.

scala> val df = spark.read.format("parquet").load("/tmp/par")
df: org.apache.spark.sql.DataFrame = [id: int]

scala> :paste
// Entering paste mode (ctrl-D to finish)

def getModificationTime(path: String): Long = {
    FileSystem.get(spark.sparkContext.hadoopConfiguration)
        .getFileStatus(new org.apache.hadoop.fs.Path(path))
        .getModificationTime()
}

// Exiting paste mode, now interpreting.

getModificationTime: (path: String)Long

scala> implicit val files = df.inputFiles.flatMap(name => Map(name -> getModificationTime(name))).toMap
files: scala.collection.immutable.Map[String,Long] = Map(file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000_2.snappy.parquet -> 1588080295000, file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000_3.snappy.parquet -> 1588080299000, file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000_4.snappy.parquet -> 1588080302000, file:///tmp/par/part-00000-c6360540-c56d-48c4-8795-05a9c0ac4d18-c000.snappy.parquet -> 1588071322000)

scala> :paste
// Entering paste mode (ctrl-D to finish)

def getTime(fileName:String)(implicit files: Map[String,Long]): Long = {
 files.getOrElse(fileName,0L)
}

// Exiting paste mode, now interpreting.

getTime: (fileName: String)(implicit files: Map[String,Long])Long

scala> val input_file_modification_time = udf(getTime _)
input_file_modification_time: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(StringType)))

scala> df.withColumn("createdDate",input_file_modification_time(input_file_name)).show
+---+-------------+
| id|  createdDate|
+---+-------------+
|  1|1588080295000|
|  2|1588080295000|
|  3|1588080295000|
|  4|1588080295000|
|  5|1588080295000|
|  6|1588080295000|
|  7|1588080295000|
|  8|1588080295000|
|  9|1588080295000|
| 10|1588080295000|
| 11|1588080295000|
| 12|1588080295000|
| 13|1588080295000|
| 14|1588080295000|
| 15|1588080295000|
| 16|1588080295000|
| 17|1588080295000|
| 18|1588080295000|
| 19|1588080295000|
| 20|1588080295000|
+---+-------------+
only showing top 20 rows


scala>

1 голос
/ 28 апреля 2020

Проблема в том, что spark является нулем в UDF, поскольку оно существует только в драйвере. Другая проблема в том, что hadoops Configuration не сериализуем, поэтому вы не можете легко заключить его в udf. Но есть обходной путь, использующий org.apache.spark.SerializableWritable:

import org.apache.spark.SerializableWritable
import org.apache.hadoop.conf.Configuration

val conf = new SerializableWritable(spark.sparkContext.hadoopConfiguration)

def getModificationTime(path: String, conf:SerializableWritable[Configuration]): Long = {
    org.apache.hadoop.fs.FileSystem.get(conf.value)
        .getFileStatus(new org.apache.hadoop.fs.Path(path))
        .getModificationTime()
}

def input_file_modification_time(conf:SerializableWritable[Configuration]) = udf((path: String) => getModificationTime(path,conf))

data.select(input_file_modification_time(conf)($"input_file_name") as "modification_time").show(20, false)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...