У меня есть DataSet типа:
scala> userDs
res162: org.apache.flink.api.scala.DataSet[org.apache.flink.types.Row] = org.apache.flink.api.scala.DataSet@584e2243
Я хочу выполнить запрос с использованием Flink Table API несколько раз для каждого элемента этого DataSet.Итак, я использую следующее утверждение:
scala> userDs.iterate(1) {x => x.map(c => {btenv.sqlQuery("select _2 from sn where _1 = 'someName'") ; c} ) }.print
Я проверил, и каждая отдельная часть кода, кажется, работает правильно, и запрос также работает, как и ожидалось, но когда он выполняется витерация, как это, то я получаю NullPointerException следующим образом:
scala> userDs.iterate(1) {x => x.map(c => {btenv.sqlQuery("select _2 from sn where _1 = 'kim'") ; c} ) }.print
org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: a59a42e7fcee6d2fb618d2e958732548)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1864)
... 30 elided
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 40 more
Caused by: java.lang.NullPointerException
at $anonfun$1$$anonfun$apply$1.apply(<console>:71)
at $anonfun$1$$anonfun$apply$1.apply(<console>:71)
at org.apache.flink.api.scala.DataSet$$anon$1.map(DataSet.scala:490)
at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:157)
at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:122)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Есть идеи, что нужно исправить?Можно ли выполнять итеративный запрос к API таблицы для DataSet, который также получен посредством запроса из той же таблицы?