Перейдите к EMR: отправлено задание с org.apache.flink.client.program.ProgramInvocationException: Не удалось получить результат выполнения - PullRequest
0 голосов
/ 11 октября 2019

Я запускаю задание flink emr и получаю следующие результаты:

    at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
    at org.myorg.quickstart.StreamingJob.main(StreamingJob.java:52)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)
    at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
    at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:208)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:309)
    at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
    at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
    at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    ... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
    at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152)
    at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:76)
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:351)
    at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
    ... 7 more
Caused by: java.lang.IllegalArgumentException: The scheme (hdfs://, file://, etc) is null. Please specify the file system scheme explicitly in the URI.
    at org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.validatePath(AbstractFileStateBackend.java:187)
    at org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.<init>(AbstractFileStateBackend.java:109)
    at org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend.<init>(AbstractFileStateBackend.java:131)
    at org.apache.flink.runtime.state.memory.MemoryStateBackend.<init>(MemoryStateBackend.java:238)
    at org.apache.flink.runtime.state.memory.MemoryStateBackend.configure(MemoryStateBackend.java:286)
    at org.apache.flink.runtime.state.memory.MemoryStateBackendFactory.createFromConfig(MemoryStateBackendFactory.java:33)
    at org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:225)
    at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:304)
    at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
    at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1198)
    at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1178)
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:287)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:83)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:37)
    at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)
    ... 10 more

End of exception on server side>]
    at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
    at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
    at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
    ... 4 more

Долгосрочное приложение flink yarn запущено и работает, и я настроил кластер emr с высокой доступностью в соответствии с https://docs.aws.amazon.com/emr/latest/ReleaseGuide/flink-configure.html.

Я также пытался запустить примерную пакетную программу через flink run /usr/lib/flink/examples/batch/WordCount.jar, и я вижу, что она успешно завершена в веб-интерфейсе.

Существуют ли какие-либо другие конфигурации, которые я могу пропустить из-за этой ошибки?

1 Ответ

0 голосов
/ 14 октября 2019

Согласно трассировке стека это выглядит так, как будто вы настроили задание на использование MemoryStateBackend. При настройке этого состояния backend также можно указать путь к контрольной точке и точке сохранения. При настройке этих путей важно указать схему hdfs:// (или любую другую распределенную файловую систему, которая поддерживается и которую вы хотите использовать), поскольку системе необходимо знать, какой Filesystem использовать. Вместо указания пути как my/checkpoint/path, он должен быть hdfs://my/checkpoint/path.

...