NullPointerException с использованием ROW_NUMBER в планировщике Blink - PullRequest
0 голосов
/ 29 мая 2020

При выполнении запроса

SELECT key, ROW_NUMBER() OVER (PARTITION BY key ORDER BY rt) FROM InputTable

Я получаю

Exception in thread "main" java.lang.NullPointerException
    at scala.collection.mutable.ArrayOps$ofInt$.length$extension(ArrayOps.scala:240)
    at scala.collection.mutable.ArrayOps$ofInt.length(ArrayOps.scala:240)
    at scala.collection.SeqLike$class.size(SeqLike.scala:106)
    at scala.collection.mutable.ArrayOps$ofInt.size(ArrayOps.scala:234)
    at scala.collection.mutable.Builder$class.sizeHint(Builder.scala:69)
    at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:22)
    at scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:230)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
    at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234)
    at org.apache.flink.table.planner.codegen.agg.DeclarativeAggCodeGen.<init>(DeclarativeAggCodeGen.scala:81)
    at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$3.apply(AggsHandlerCodeGenerator.scala:220)
    at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$3.apply(AggsHandlerCodeGenerator.scala:212)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.initialAggregateInformation(AggsHandlerCodeGenerator.scala:212)
    at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateAggsHandler(AggsHandlerCodeGenerator.scala:322)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.createUnboundedOverProcessFunction(StreamExecOverAggregate.scala:344)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:231)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlan(StreamExecOverAggregate.scala:56)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
    at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
    at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
    at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
    at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
    at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)

Я использую Flink 1.10 с планировщиком Blink.


Пример:

package com.example

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.table.api.EnvironmentSettings

object Application extends App {

  import org.apache.flink.streaming.api.scala._
  import org.apache.flink.table.api.scala._

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val config = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
  val tEnv = StreamTableEnvironment.create(env, config)

  val input = env.fromElements(
    Model("k", "v", 1L)
  )

  tEnv.createTemporaryView("InputTable", input, 'k, 'v, 'ts.rowtime)

  val output = tEnv.sqlQuery(
    "SELECT k, v, ROW_NUMBER() OVER (PARTITION BY k ORDER BY ts) AS rn FROM InputTable a"
  )

  tEnv.toRetractStream[Model](output).print()

  env.execute()
}

final case class Model(k: String, v: String, ts: Long)
...