Проблема с отправкой задания из пользовательского интерфейса задания Flink (исключение: org. apache .flink.client.program.OptimizerPlanEnvironment $ ProgramAbortException) - PullRequest
0 голосов
/ 17 июня 2020

У меня есть простой java код для задания flink

List<Tuple2> list = new ArrayList<>();
for (int i  = 0; i < 10; i++) {
     list.add(new Tuple2(Integer.valueOf(i), "test" + i));
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromCollection(list).print();
env.execute("job1");

Я упаковал этот код и создал банку: скажем, flink-processor-0.1-SNAPSHOT.jar, загрузите его в JobManager из интерфейса отправки задания . Нет проблем с загрузкой. Я вижу, что EntryClass имеет основной класс (com.ab c .xyz.streaming.FlinkProcessor). Теперь я отправляю задание из «Submit Job Ui» с некоторыми параметрами (--ns.conf1 ab c .file - -ns.conf2 xyz.file) и указал основной класс (com.ab c .xyz.streaming.FlinkProcessor). Не удается отправить задание. В JobManager я вижу следующие ошибки.

org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
    at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:70)
    at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:53)
    at com.abc.xyz.streaming.FlinkProcessor.run(FlinkProcessor.java:114)
    at com.abc.xyz.streaming.FlinkProcessor.main(FlinkProcessor.java:53)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
    at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
    at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
    at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
2020-06-16 16:14:20,588 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The program plan could not be fetched - the program aborted pre-maturely.

System.err: Running flink job :2020-06-16T23:14:20.476Z
 Error msg: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
    at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:70)
    at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:53)

Когда я отправляю то же задание, используя команду flink run, оно работает нормально. Ошибок нет.

flink run - c com.ab c .xyz.streaming.FlinkProcessor /Users/abc/target/flink-processor-0.1-SNAPSHOT.jar - -ns.conf1 ab c .file --ns.conf2 xyz.file

Не уверены, что мне здесь не хватает? Любая помощь приветствуется.

1 Ответ

0 голосов
/ 17 июня 2020

Нашел проблему. Я обертывал свой код с помощью

try { 
 List<Tuple2> list = new ArrayList<>();
 for (int i  = 0; i < 10; i++) {
     list.add(new Tuple2(Integer.valueOf(i), "test" + i));
 }
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.fromCollection(list).print();
 env.execute("job1");
} catch (Throwable t) { 
ignore t
};

изменил блок исключения try {.....} catch (Exception exp) {ignore exp}; После этого он начал работать.

Спасибо!

...