Я скачал и установил зависимости для Apache Flink 1.11.1-bin-scala_2.11.tgz. Запустите единственный экземпляр (только один компьютер) с помощью команды:
> "/home/ubuntu/flink-1.11.1/bin/start-cluster.sh"
import org.apache.flink.api.scala._
object Test1 {
def main(args: Array[String]): Unit = {
//val env = ExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.createRemoteEnvironment("192.168.0.110", 6123, jarFiles = "target\\scala-2.11\\untitled2_2.11-0.1.jar")
val text = env.fromElements("To be, or not to be, that is the question")
val counts = text.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }
.groupBy(0)
.sum(1)
counts.print()
env.execute("Test1 WordCount")
}
}
My scala code runs locally, and also when I scp a copy over to the Flink server if I use "ExecutionEnvironment.getExecutionEnvironment". I run it on the server manually by running "./bin/flink run /path/untitled2_2.11-0.1.jar". It ALWAYS fails for some reason when I try to connect remotely from Intellij IDE -> Flink Server using "createRemoteEnvironment". The error I keep getting is the following:
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:277)
at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:981)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:889)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:873)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1864)
at Test1$.main(Test1.scala:16)
at Test1.main(Test1.scala)
UPDATE #1:
Link: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-5-TooLongFrameException-in-cluster-mode-td20883.html Сводка: номер порта изменен с 6123 на 8081, задание теперь регистрируется в сети gui, хотя раньше этого не происходило. Новые версии Flink, по-видимому, обмениваются данными через указанный пользователем порт REST. Теперь он не работает с другой ошибкой:
2020-08-07 18:24:22
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: scala.collection.immutable.List$SerializationProxy; local class incompatible: stream classdesc serialVersionUID = -7905219378619747021, local class serialVersionUID = 1
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: scala.collection.immutable.List$SerializationProxy; local class incompatible: stream classdesc serialVersionUID = -7905219378619747021, local class serialVersionUID = 1
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
at org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1449)
at org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.setup(SynchronousChainedCombineDriver.java:90)
at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1316)
at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:316)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
... 3 more
Caused by: java.io.InvalidClassException: scala.collection.immutable.List$SerializationProxy; local class incompatible: stream classdesc serialVersionUID = -7905219378619747021, local class serialVersionUID = 1
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
... 9 more
ОБНОВЛЕНИЕ № 3: Это вызвано несоответствием версий между установленным scala на Apache сервере Flink и тем, что использует моя IDE.
Apache Flink:
# scala -version
Понизьте версию IDE до Scala, чтобы она соответствовала, оставив код прежним