У меня возникла проблема с длинной, сложной искровой работой, которая содержит udf.
Проблема, с которой я столкнулся, заключается в том, что udf не вызывается должным образом, хотя сообщения об ошибке нет.
Я знаю, что он не вызывается должным образом, потому что вывод записывается, только то, что udf должен был рассчитать, равно NULL , и при локальной отладке не отображаются операторы печати.
Единственная причина в том, что этот код ранее работал с использованием разных входных данных, что означает, что ошибка должна иметь отношение к вводу.
Изменение входных данных в основном означает, что используются разные имена столбцов, которые рассматриваются в коде.
Операторы печати выполняются с первого, «рабочего» ввода.
Оба входа создаются с использованием одной и той же серии шагов из одной и той же базы данных, и при проверке не возникает проблем ни с одним из них.
Я никогда раньше не сталкивался с подобным поведением, и любые указания на то, что может его вызвать, будут оценены.
Код монолитный и негибкий - я работаю над рефакторингом, но разбить его не так просто. Это краткая версия происходящего:
package mypackage
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.util._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.types._
import scala.collection.{Map => SMap}
object MyObject {
def main(args: Array[String]){
val spark: SparkSession = SparkSession.builder()
.appName("my app")
.config("spark.master", "local")
.getOrCreate()
import spark.implicits._
val bigInput = spark.read.parquet("inputname.parquet")
val reference_table = spark.read.parquet("reference_table.parquet")
val exchange_rate = spark.read.parquet("reference_table.parquet")
val bigInput2 = bigInput
.filter($"column1" === "condition1")
.join(joinargs)
.drop(dropargs)
val bigInput3 = bigInput
.filter($"column2" === "condition2")
.join(joinargs)
.drop(dropargs)
<continue for many lines...>
def mapper1(
arg1: String,
arg2: Double,
arg3: Integer
): List[Double]{
exchange_rate.map(
List(idx1, idx2, idx3),
r.toSeq.toList
.drop(idx4)
.take(arg2)
)
}
def mapper2(){}
...
def mapper5(){}
def my_udf(
arg0: Integer,
arg1: String,
arg2: Double,
arg3: Integer,
...
arg20: String
): Double = {
println("I'm actually doing something!")
val result1 = mapper1(arg1, arg2, arg3)
val result2 = mapper2(arg4, arg5, arg6, arg7)
...
val result5 = mapper5(arg18, arg19, arg20)
result1.take(arg0)
.zipAll(result1, 0.0, 0.0)
.map(x=>_1*x._2)
....
.zipAll(result5, 0.0, 0.0)
.foldLeft(0.0)(_+_)
}
spark.udf.register("myUDF", my_udf_)
val bigResult1 = bigInputFinal.withColumn("Newcolumnname",
callUDF(
"myUDF",
$"col1",
...
$"col20"
)
)
<postprocessing>
bigResultFinal
.filter(<configs>)
.select(<column names>)
.write
.format("parquet")
}
}
Подведем итог
Этот код выполняется до конца для каждого из двух входных файлов.
udf отображается только для первого файла.
Нет сообщений об ошибках или чего-либо другого, использующего второй файл, хотя вся логика не в формате udf, кажется, завершается успешно.
Любая помощь с благодарностью!