ClassNotFoundException при отправке задания через RestClusterClient - PullRequest
0 голосов
/ 13 февраля 2020

У меня есть задание Flink, которое работает нормально, когда я загружаю его вручную в кластер (используя пользовательский интерфейс).

Но когда я пытаюсь развернуть его через RestClusterClient, оно не дает мне ClassNotFoundException (я делаю увидеть, как задание появляется в кластере и не работает).

val packagedProgram = new PackagedProgram(
    new File("/home/laurent/Projects/lead-job-runner/target/lead-0.1-jar-with-dependencies.jar"),
    Array("--kafka-bootstrap-servers", "kafka:29092"): _*   
)
val configuration = new Configuration()  
configuration.setString(JobManagerOptions.ADDRESS, "localhost")  
configuration.setString("jobmanager.rpc.port", "6123")

val clusterClient = new RestClusterClient[StandaloneClusterId](
    configuration,
    StandaloneClusterId.getInstance()
)  
clusterClient.run(packagedProgram, 2)

Вот журналы из диспетчера заданий:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order.
    at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:398)
    at  org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:212)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:190)
    at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.<init>(TwoInputStreamTask.java:55)
    at sun.reflect.GeneratedConstructorAccessor11.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1405)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689)
    at java.lang.Thread.run(Thread.java:748)
 Caused by: java.lang.ClassNotFoundException: eu.euranova.chng.PlanBuilder$$anon$1
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at java.util.ArrayList.readObject(ArrayList.java:797)
    at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)
    at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:395)
    ... 10 more

Данный класс действительно не является пакетами в банке ( это scala класс анонимных функций). enter image description here

Но тогда ... почему работа запускается нормально при запуске вручную по сравнению с RestClusterClient с использованием точно такой же жирной банки?

Кроме того, когда установив уровень журнала flink на DEBUG, я вижу много журналов в IntelliJ, таких как этот:

DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming TwoInputTransformation{id=17, name='process-control-operator', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.ShareEvent>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraph              - CO-TASK: 17
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming SideOutputTransformation{id=21, name='SideOutput', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.flink.streaming.control.event.ControlEvent>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming TwoInputTransformation{id=20, name='process-control-operator', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.Row>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming TwoInputTransformation{id=19, name='Co-Map', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.Row>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming PartitionTransformation{id=12, name='Partition', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), eu.euranova.model.InstallEvent(userId: String, gameId: String, timestamp: Long, name: String, dumVar: Long)>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraph              - CO-TASK: 19
DEBUG org.apache.flink.streaming.api.graph.StreamGraph              - CO-TASK: 20
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming SideOutputTransformation{id=24, name='SideOutput', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.flink.streaming.control.event.ControlEvent>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming TwoInputTransformation{id=23, name='process-control-operator', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.Row>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraph              - CO-TASK: 23

... это указывает на то, что по крайней мере «драйвер» работает локально, в то время как задачи отправлено в кластер (другое объяснение будет то, что кластер отправляет все журналы обратно в RestClusterClient, который я нахожу нелепо)

Не могли бы вы знать, что является причиной этого и как я могу заставить его работать?

Заранее благодарим за помощь.

С наилучшими пожеланиями,

Лоран.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...