У меня есть задание Flink, которое работает нормально, когда я загружаю его вручную в кластер (используя пользовательский интерфейс).
Но когда я пытаюсь развернуть его через RestClusterClient, оно не дает мне ClassNotFoundException (я делаю увидеть, как задание появляется в кластере и не работает).
val packagedProgram = new PackagedProgram(
new File("/home/laurent/Projects/lead-job-runner/target/lead-0.1-jar-with-dependencies.jar"),
Array("--kafka-bootstrap-servers", "kafka:29092"): _*
)
val configuration = new Configuration()
configuration.setString(JobManagerOptions.ADDRESS, "localhost")
configuration.setString("jobmanager.rpc.port", "6123")
val clusterClient = new RestClusterClient[StandaloneClusterId](
configuration,
StandaloneClusterId.getInstance()
)
clusterClient.run(packagedProgram, 2)
Вот журналы из диспетчера заданий:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order.
at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:398)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1164)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:212)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:190)
at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.<init>(TwoInputStreamTask.java:55)
at sun.reflect.GeneratedConstructorAccessor11.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1405)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: eu.euranova.chng.PlanBuilder$$anon$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at java.util.ArrayList.readObject(ArrayList.java:797)
at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)
at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:395)
... 10 more
Данный класс действительно не является пакетами в банке ( это scala класс анонимных функций). ![enter image description here](https://i.stack.imgur.com/jy8nD.png)
Но тогда ... почему работа запускается нормально при запуске вручную по сравнению с RestClusterClient с использованием точно такой же жирной банки?
Кроме того, когда установив уровень журнала flink на DEBUG, я вижу много журналов в IntelliJ, таких как этот:
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming TwoInputTransformation{id=17, name='process-control-operator', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.ShareEvent>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraph - CO-TASK: 17
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming SideOutputTransformation{id=21, name='SideOutput', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.flink.streaming.control.event.ControlEvent>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming TwoInputTransformation{id=20, name='process-control-operator', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.Row>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming TwoInputTransformation{id=19, name='Co-Map', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.Row>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming PartitionTransformation{id=12, name='Partition', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), eu.euranova.model.InstallEvent(userId: String, gameId: String, timestamp: Long, name: String, dumVar: Long)>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraph - CO-TASK: 19
DEBUG org.apache.flink.streaming.api.graph.StreamGraph - CO-TASK: 20
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming SideOutputTransformation{id=24, name='SideOutput', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.flink.streaming.control.event.ControlEvent>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator - Transforming TwoInputTransformation{id=23, name='process-control-operator', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.Row>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraph - CO-TASK: 23
... это указывает на то, что по крайней мере «драйвер» работает локально, в то время как задачи отправлено в кластер (другое объяснение будет то, что кластер отправляет все журналы обратно в RestClusterClient, который я нахожу нелепо)
Не могли бы вы знать, что является причиной этого и как я могу заставить его работать?
Заранее благодарим за помощь.
С наилучшими пожеланиями,
Лоран.