Работа Spark завершается без выполнения udf - PullRequest
0 голосов
/ 19 ноября 2018

У меня возникла проблема с длинной, сложной искровой работой, которая содержит udf.

Проблема, с которой я столкнулся, заключается в том, что udf не вызывается должным образом, хотя сообщения об ошибке нет.

Я знаю, что он не вызывается должным образом, потому что вывод записывается, только то, что udf должен был рассчитать, равно NULL , и при локальной отладке не отображаются операторы печати. ​​

Единственная причина в том, что этот код ранее работал с использованием разных входных данных, что означает, что ошибка должна иметь отношение к вводу.

Изменение входных данных в основном означает, что используются разные имена столбцов, которые рассматриваются в коде.

Операторы печати выполняются с первого, «рабочего» ввода.

Оба входа создаются с использованием одной и той же серии шагов из одной и той же базы данных, и при проверке не возникает проблем ни с одним из них.

Я никогда раньше не сталкивался с подобным поведением, и любые указания на то, что может его вызвать, будут оценены.

Код монолитный и негибкий - я работаю над рефакторингом, но разбить его не так просто. Это краткая версия происходящего:

package mypackage

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.util._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.types._

import scala.collection.{Map => SMap}


object MyObject {

  def main(args: Array[String]){
    val spark: SparkSession = SparkSession.builder()
      .appName("my app")
      .config("spark.master", "local")
      .getOrCreate()

    import spark.implicits._

    val bigInput = spark.read.parquet("inputname.parquet")
    val reference_table = spark.read.parquet("reference_table.parquet")
    val exchange_rate = spark.read.parquet("reference_table.parquet")


    val bigInput2 = bigInput
      .filter($"column1" === "condition1")
      .join(joinargs)
      .drop(dropargs)

    val bigInput3 = bigInput
      .filter($"column2" === "condition2")
      .join(joinargs)
      .drop(dropargs)

    <continue for many lines...>

    def mapper1(
      arg1: String,
      arg2: Double,
      arg3: Integer
    ): List[Double]{
      exchange_rate.map(
        List(idx1, idx2, idx3),
        r.toSeq.toList
          .drop(idx4)
          .take(arg2)
      )
    }

    def mapper2(){}
    ...
    def mapper5(){}

    def my_udf(
      arg0: Integer,
      arg1: String,
      arg2: Double,
      arg3: Integer,
      ...
      arg20: String
    ): Double = {
      println("I'm actually doing something!")
      val result1 = mapper1(arg1, arg2, arg3)
      val result2 = mapper2(arg4, arg5, arg6, arg7)
      ...
      val result5 = mapper5(arg18, arg19, arg20)
      result1.take(arg0)
        .zipAll(result1, 0.0, 0.0)
        .map(x=>_1*x._2)
        ....
        .zipAll(result5, 0.0, 0.0)
        .foldLeft(0.0)(_+_)
    }

    spark.udf.register("myUDF", my_udf_)
    val bigResult1 = bigInputFinal.withColumn("Newcolumnname",
      callUDF(
        "myUDF",
        $"col1",
        ...
        $"col20"
      )
    )

    <postprocessing>
    bigResultFinal
        .filter(<configs>)
        .select(<column names>)
        .write
        .format("parquet")
  }
}

Подведем итог

Этот код выполняется до конца для каждого из двух входных файлов.

udf отображается только для первого файла.

Нет сообщений об ошибках или чего-либо другого, использующего второй файл, хотя вся логика не в формате udf, кажется, завершается успешно.

Любая помощь с благодарностью!

1 Ответ

0 голосов
/ 19 марта 2019

Здесь UDF не вызывается, потому что искра Ленивый это не вызывает UDF, если вы не используете какие-либо действия на кадре данных. Этого можно добиться, форсируя действия с кадрами данных.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...