Исключение NullPointerException при использовании Flink's leftOuterJoinLateral в Scala - PullRequest
1 голос
/ 12 октября 2019

Я пытаюсь следовать документации и создать табличную функцию для "выравнивания" некоторых данных. Табличная функция работает нормально при использовании joinLateral для выравнивания, однако при использовании leftOuterJoinLateral я получаю следующую ошибку. Я использую Scala и пробовал использовать API таблиц и SQL с одинаковым результатом:

Причина: java.lang.NullPointerException: Нулевой результат не может быть сохранен в классе Case.

Вот моя работа:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.functions.TableFunction

object example_job{
  // Split the List[Int] into multiple rows
  class Split() extends TableFunction[Int] {
    def eval(nums: List[Int]): Unit = {
      nums.foreach(x =>
        if(x != 3) {
          collect(x)
      })
    }
  }

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.createLocalEnvironment()
    val tableEnv = StreamTableEnvironment.create(env)
    val splitMe = new Split()

    // Create some dummy data
    val events: DataStream[(String, List[Int])] = env.fromElements(("simon", List(1,2,3)), ("jessica", List(3)))

    val table = tableEnv.fromDataStream(events, 'name, 'numbers)
      .leftOuterJoinLateral(splitMe('numbers) as 'number)
      .select('name, 'number)
    table.toAppendStream[(String, Int)].print()
    env.execute("Flink jira ticket example")
  }
}

При изменении .leftOuterJoinLateral на .joinLateral я получаю ожидаемый результат:

(simon,1)
(simon,2)

При использовании .leftOuterJoinLateralЯ ожидаю что-то вроде:

(simon,1)
(simon,2)
(simon,null) // or (simon, None)
(jessica,null) // or (jessica, None)

Похоже, это может быть ошибкой в ​​Scala API? Я хотел проверить здесь прежде, чем поднять билет на случай, если я сделаю что-нибудь глупое!

1 Ответ

1 голос
/ 14 октября 2019

Проблема в том, что Flink по умолчанию ожидает, что все поля строки не равны NULL. Вот почему программа завершается ошибкой, когда видит результат null от операции внешнего соединения. Чтобы принять значения null, необходимо либо отключить проверку нуля с помощью

val tableConfig = tableEnv.getConfig
tableConfig.setNullCheck(false)

, либо указать тип результата для допуска значений NULL, например, указав пользовательский тип вывода POJO:

table.toAppendStream[MyOutput].print()

с

class MyOutput(var name: String, var number: Integer) {
  def this() {
    this(null, null)
  }

  override def toString: String = s"($name, $number)"
}
...