Azure Хранение BLOB-объектов с Apache Flink 1.10 - PullRequest
0 голосов
/ 14 апреля 2020

Я пытаюсь использовать Azure Blob Storage с Apache Flink 1.10 для целей контрольных точек.

Я выполнил все инструкции, упомянутые в [Документация Flink] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/filesystems/azure.html

Шаг 1:

mkdir ./plugins/azure-fs-hadoop cp ./opt/flink-azure-fs-hadoop-1.10.0.jar ./plugins/azure-fs-hadoop/

Шаг 2:

Это то, что у меня есть в flink-conf. xml

#Azure Storage Key **fs.azure.account.key.<storage-account>.blob.core.windows.net:xxxxxxxxxxxxxxxxxx**

Шаг 3:

Использование Azure Хранение BLOB-объектов для контрольных точек

Это то, что у меня есть в моей работе Flink

final StateBackend stateBackend = new FsStateBackend("wasb://flink-blob@$<storage-account>.blob.core.windows.net/checkpoint");

Я не уверен, что здесь что-то пропустил, но когда я отправляю работу, я получаю ниже Exception (AskTimeoutException от Actor).

    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
    at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1741)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
    at com.example.flink.checkpointing.CheckpointExample.main(CheckpointExample.java:78)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
    ... 8 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1736)
    ... 17 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
    at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
    at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085)
    at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#75666936]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
    at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
    at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
    at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
    at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
    at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
    at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
    at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
    at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
    at java.base/java.lang.Thread.run(Thread.java:834)

End of exception on server side>]
    at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
    at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
    ... 4 more```

1 Ответ

1 голос
/ 15 апреля 2020

Я думаю, что ваша проблема двойная. Истинная причина сбоя скрыта из-за AskTimeoutException. Эта проблема была решена с FLINK-16018 , который будет выпущен с Flink 1.10.1. Проблема в том, что значение тайм-аута слишком агрессивно, так что долгая отправка задания на клиентской стороне будет неудачной.

Для истинной причины сбоя я бы рекомендовал взглянуть на jobmanager.log у Флинка. Он должен содержать информацию о том, что пошло не так. Я подозреваю, что существует неправильная конфигурация Azure хранилища BLOB-объектов.

...