Не удалось выполнить пользовательскую функцию на фрейме данных в Spark (Scala) - PullRequest
2 голосов
/ 01 апреля 2019

У меня есть датафрейм df, подобный следующему

+--------+--------------------+--------+------+
|      id|                path|somestff| hash1|
+--------+--------------------+--------+------+
|       1|/file/dirA/fileA.txt|      58| 65161|
|       2|/file/dirB/fileB.txt|      52| 65913|
|       3|/file/dirC/fileC.txt|      99|131073|
|       4|/file/dirF/fileD.txt|      46|196233|
+--------+--------------------+--------+------+

Одно примечание: / file / dir отличаются. Не все файлы хранятся в одном каталоге. На самом деле там сотни файлов в разных каталогах.

Здесь я хочу прочитать файл в пути к столбцу, подсчитать записи в файлах и записать результат подсчета строк в новый столбец кадра данных.

Я попробовал следующую функцию и udf:

def executeRowCount(fileCount: String): Long = {
  val rowCount = spark.read.format("csv").option("header", "false").load(fileCount).count
  rowCount
}

val execUdf = udf(executeRowCount _)

df.withColumn("row_count", execUdf (col("path"))).show()

Это приводит к следующей ошибке

org.apache.spark.SparkException: Failed to execute user defined fu
nction($anonfun$1: (string) => bigint)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at $line39.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:28)
        at $line39.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)
        ... 19 more

Я пытался перебрать столбец при сборе, как

val te = df.select("path").as[String].collect()
te.foreach(executeRowCount)

и здесь все отлично работает, но я хочу сохранить результат в df ...

Я пробовал несколько решений, но здесь я зашел в тупик.

Ответы [ 4 ]

2 голосов
/ 01 апреля 2019

Это не работает, поскольку фреймы данных могут быть созданы только в JVM драйвера, но код UDF выполняется в JVM исполнителя.Что вы можете сделать, это загрузить CSV-файлы в отдельный фрейм данных и обогатить данные столбцом имени файла:

val csvs = spark
 .read
 .format("csv")
 .load("/file/dir/")
 .withColumn("filename", input_file_name())

, а затем соединить исходный df в filename столбец

0 голосов
/ 04 апреля 2019

Я исправил эту проблему следующим образом:

val queue = df.select("path").as[String].collect()
val countResult = for (item <- queue) yield {
    val rowCount = (item, spark.read.format("csv").option("header", "false").load(item).count)
    rowCount
}

val df2 = spark.createDataFrame(countResult)

Впоследствии я присоединился к df с помощью df2 ...

Проблема здесь в том, что @ ollik1 упоминается в драйвере / работнике.архитектура на udfs.UDF не сериализуем, что мне нужно для функции spark.read.

0 голосов
/ 01 апреля 2019

Может быть что-то подобное?

  sqlContext
    .read
    .format("csv")
    .load("/tmp/input/")
    .withColumn("filename", input_file_name())
    .groupBy("filename")
    .agg(count("filename").as("record_count"))
    .show()
0 голосов
/ 01 апреля 2019

Как насчет? :

def executeRowCount = udf((fileCount: String) => {
  spark.read.format("csv").option("header", "false").load(fileCount).count
})

df.withColumn("row_count", executeRowCount(col("path"))).show()
...