Не удалось выполнить пользовательскую функцию при передаче сложного типа в UDF - PullRequest
0 голосов
/ 23 мая 2019

У меня очень странная проблема, с которой мне действительно нужна помощь.

У меня есть данные, которые я искажаю в Spark (2.3), запустив crossJoin, а затем вызвав пару UDF. Когда я запускаю приведенный ниже код, все хорошо, и результаты выводятся на консоль. Когда я раскомментирую фильтр (перед методом show), я получаю NoSuchElementException, но на удивление он выдается из firstInArray UDF. Единственное, что выделяется для меня, это то, что leftNameTokenComplement UDF принимает строку в качестве ввода, и это complexType.

Еще одно интересное наблюдение: если я заменим .show() на .cache(), а затем использую кэшированный набор данных для .filter().show(), я не получу исключения. Это почти как если бы пересчет UDF не работал.

Буду очень признателен за любую помощь !!!

Dataset<Row> connectedDf = sparkSession.read().json(this.workspaceDir + "connected-df");

connectedDf.as("df1")
    .crossJoin(connectedDf.as("df2"))
    .filter( "df1.id < df2.id AND df1.nameGroupId != df2.nameGroupId" )
    .withColumn("leftComp", functions.callUDF("leftNameTokenComplement", col("df1.all"), col("df2.all") ))
    .filter( functions.size(col("leftComp")).equalTo(1) )       
    .withColumn("src1", functions.callUDF("firstInArray", col("leftComp")) )
    .withColumn("matchedPhonic", functions.callUDF("isNameTokenPhonicMatch", col("src1")) )
    //.filter( col("matchedPhonic").equalTo(true)) Exception thrown if uncommented
    .show();

Вывод, когда закомментирован фильтр:

+--------------------+-------+-----------+--------------------+--------+------------+--------+----+-------------+
|                 all|     id|nameGroupId|                 all|      id| nameGroupId|leftComp|src1|matchedPhonic|
+--------------------+-------+-----------+--------------------+--------+------------+--------+----+-------------+
|[[[UOLA], xid1, j...|joe san| 6302870528|[[[OLA, ALA], xid...|john san|936302870528|   [joe]| joe|         true|
+--------------------+-------+-----------+--------------------+--------+------------+--------+----+-------------+

Исключительная ситуация, если оператор фильтра не закомментирован

Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$28: (array<string>) => string)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.ScalaUDF_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.And_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.And_1$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$org$apache$spark$sql$execution$joins$BroadcastNestedLoopJoinExec$$boundCondition$1.apply(BroadcastNestedLoopJoinExec.scala:87)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$org$apache$spark$sql$execution$joins$BroadcastNestedLoopJoinExec$$boundCondition$1.apply(BroadcastNestedLoopJoinExec.scala:87)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.NoSuchElementException: next on empty iterator
    at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
    at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
    at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
    at org.data.analysis.ops.name.GeneralUDFs$FirstInArrayFunction.call(GeneralUDFs.java:408)
    at org.data.analysis.ops.name.GeneralUDFs$FirstInArrayFunction.call(GeneralUDFs.java:1)
    at org.apache.spark.sql.UDFRegistration$$anonfun$28.apply(UDFRegistration.scala:711)

Вот моя схема входных данных вместе с некоторыми примерами данных:

{
  "type" : "struct",
  "fields" : [ {
    "name" : "all",
    "type" : {
      "type" : "array",
      "elementType" : {
        "type" : "struct",
        "fields" : [ {
          "name" : "phonics",
          "type" : {
            "type" : "array",
            "elementType" : "string",
            "containsNull" : true
          },
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "stdToken",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "token",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        }, {
          "name" : "type",
          "type" : "string",
          "nullable" : true,
          "metadata" : { }
        } ]
      },
      "containsNull" : true
    },
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "id",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "nameGroupId",
    "type" : "long",
    "nullable" : true,
    "metadata" : { }
  } ]
}

    {"id":"joe san", "all":[{"type":"ty1","token":"joe","stdToken":"xid1","phonics":["UOLA"]},{"type":"tp2","token":"san","stdToken":"xid12","phonics":["RACI","RADI"]} ],"nameGroupId":6302870528}
{"id":"john san","all":[{"type":"ty1","token":"john","stdToken":"xid11","phonics":["OLA","ALA"]},{"type":"tp2","token":"san","stdToken":"xid12","phonics":["RACI","RADI"]} ],"nameGroupId":936302870528}

Подпись UDF:

leftNameTokenComplement
UDF2<Seq<Row>, Seq<Row>, List<String>>

firstInArray
UDF1<WrappedArray, String>

isNameTokenPhonicMatch
UDF1<String, Boolean>
...