Невозможно запустить удаленную задачу на Apache экземпляр Flink - PullRequest
0 голосов
/ 05 августа 2020

Я скачал и установил зависимости для Apache Flink 1.11.1-bin-scala_2.11.tgz. Запустите единственный экземпляр (только один компьютер) с помощью команды:

> "/home/ubuntu/flink-1.11.1/bin/start-cluster.sh" enter image description here

enter image description here

import org.apache.flink.api.scala._

object Test1 {
  def main(args: Array[String]): Unit = {
    //val env = ExecutionEnvironment.getExecutionEnvironment
    val env = ExecutionEnvironment.createRemoteEnvironment("192.168.0.110", 6123, jarFiles = "target\\scala-2.11\\untitled2_2.11-0.1.jar")

    val text = env.fromElements("To be, or not to be, that is the question")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    counts.print()

    env.execute("Test1 WordCount")
  }
}

My scala code runs locally, and also when I scp a copy over to the Flink server if I use "ExecutionEnvironment.getExecutionEnvironment". I run it on the server manually by running "./bin/flink run /path/untitled2_2.11-0.1.jar". It ALWAYS fails for some reason when I try to connect remotely from Intellij IDE -> Flink Server using "createRemoteEnvironment". The error I keep getting is the following:

Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:277)
    at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:981)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:889)
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:873)
    at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
    at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
    at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1864)
    at Test1$.main(Test1.scala:16)
    at Test1.main(Test1.scala)

UPDATE #1: Link: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-5-TooLongFrameException-in-cluster-mode-td20883.html Сводка: номер порта изменен с 6123 на 8081, задание теперь регистрируется в сети gui, хотя раньше этого не происходило. Новые версии Flink, по-видимому, обмениваются данными через указанный пользователем порт REST. Теперь он не работает с другой ошибкой:

2020-08-07 18:24:22
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: scala.collection.immutable.List$SerializationProxy; local class incompatible: stream classdesc serialVersionUID = -7905219378619747021, local class serialVersionUID = 1
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: scala.collection.immutable.List$SerializationProxy; local class incompatible: stream classdesc serialVersionUID = -7905219378619747021, local class serialVersionUID = 1
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
    at org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1449)
    at org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriver.setup(SynchronousChainedCombineDriver.java:90)
    at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
    at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1316)
    at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:316)
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
    ... 3 more
Caused by: java.io.InvalidClassException: scala.collection.immutable.List$SerializationProxy; local class incompatible: stream classdesc serialVersionUID = -7905219378619747021, local class serialVersionUID = 1
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
    ... 9 more

ОБНОВЛЕНИЕ № 3: Это вызвано несоответствием версий между установленным scala на Apache сервере Flink и тем, что использует моя IDE.

Apache Flink:

# scala -version

Понизьте версию IDE до Scala, чтобы она соответствовала, оставив код прежним

1 Ответ

0 голосов
/ 08 августа 2020

Подводя итог серии обновлений, которые я сделал для решения моей проблемы:

  1. Измените порт подключения в createRemoteEnvironment () на 8081
  2. Проверьте версию Scala на Apache Flink server, затем понизьте версию Scala в среде IDE, чтобы она соответствовала
...