Flink - исключение NullPointerException при выполнении запроса API таблицы при выполнении итерации по DataSet - PullRequest
0 голосов
/ 22 сентября 2019

У меня есть DataSet типа:

scala> userDs
res162: org.apache.flink.api.scala.DataSet[org.apache.flink.types.Row] = org.apache.flink.api.scala.DataSet@584e2243

Я хочу выполнить запрос с использованием Flink Table API несколько раз для каждого элемента этого DataSet.Итак, я использую следующее утверждение:

scala> userDs.iterate(1) {x => x.map(c => {btenv.sqlQuery("select _2 from sn where _1 = 'someName'") ; c} ) }.print

Я проверил, и каждая отдельная часть кода, кажется, работает правильно, и запрос также работает, как и ожидалось, но когда он выполняется витерация, как это, то я получаю NullPointerException следующим образом:

scala> userDs.iterate(1) {x => x.map(c => {btenv.sqlQuery("select _2 from sn where _1 = 'kim'") ; c} ) }.print
org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: a59a42e7fcee6d2fb618d2e958732548)
  at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
  at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
  at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
  at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
  at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
  at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
  at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
  at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1864)
  ... 30 elided
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
  at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
  at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
  ... 40 more
Caused by: java.lang.NullPointerException
  at $anonfun$1$$anonfun$apply$1.apply(<console>:71)
  at $anonfun$1$$anonfun$apply$1.apply(<console>:71)
  at org.apache.flink.api.scala.DataSet$$anon$1.map(DataSet.scala:490)
  at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)
  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
  at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:157)
  at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:122)
  at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
  at java.lang.Thread.run(Thread.java:748)

Есть идеи, что нужно исправить?Можно ли выполнять итеративный запрос к API таблицы для DataSet, который также получен посредством запроса из той же таблицы?

...