Я пытаюсь следовать документации и создать табличную функцию для "выравнивания" некоторых данных. Табличная функция работает нормально при использовании joinLateral
для выравнивания, однако при использовании leftOuterJoinLateral
я получаю следующую ошибку. Я использую Scala и пробовал использовать API таблиц и SQL с одинаковым результатом:
Причина: java.lang.NullPointerException: Нулевой результат не может быть сохранен в классе Case.
Вот моя работа:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.functions.TableFunction
object example_job{
// Split the List[Int] into multiple rows
class Split() extends TableFunction[Int] {
def eval(nums: List[Int]): Unit = {
nums.foreach(x =>
if(x != 3) {
collect(x)
})
}
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironment()
val tableEnv = StreamTableEnvironment.create(env)
val splitMe = new Split()
// Create some dummy data
val events: DataStream[(String, List[Int])] = env.fromElements(("simon", List(1,2,3)), ("jessica", List(3)))
val table = tableEnv.fromDataStream(events, 'name, 'numbers)
.leftOuterJoinLateral(splitMe('numbers) as 'number)
.select('name, 'number)
table.toAppendStream[(String, Int)].print()
env.execute("Flink jira ticket example")
}
}
При изменении .leftOuterJoinLateral
на .joinLateral
я получаю ожидаемый результат:
(simon,1)
(simon,2)
При использовании .leftOuterJoinLateral
Я ожидаю что-то вроде:
(simon,1)
(simon,2)
(simon,null) // or (simon, None)
(jessica,null) // or (jessica, None)
Похоже, это может быть ошибкой в Scala API? Я хотел проверить здесь прежде, чем поднять билет на случай, если я сделаю что-нибудь глупое!