Слот Flink исключен - PullRequest
       7

Слот Flink исключен

0 голосов
/ 08 января 2019

Я получаю следующее исключение

org.apache.flink.util.FlinkException: The assigned slot container_1546939492951_0001_01_003659_0 was removed.
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:789)
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:759)
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:951)
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372)
at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:823)
at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:346)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

при запуске пакетного процесса, включающего объединение двух очень больших наборов данных.

Вот что я вижу в обзоре. Ошибка произошла в диспетчере задач, который не получил никаких входных данных. Как ни странно, предыдущий набор (раздел -> плоская карта -> карта) ничего не отправлял этому диспетчеру задач, несмотря на то, что перед ним был перебаланс.

Я использую EMR. Я вижу, что есть slot.idle.timeout , будет ли это иметь эффект, и если да, то как мне указать его для этой работы? Это можно сделать из командной строки?

enter image description here

Ответы [ 2 ]

0 голосов
/ 11 мая 2019

Вы можете добавить следующую строку в коде Java.

env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

тогда ваша работа начнется с отмены автоматически.

0 голосов
/ 23 января 2019

возможно, это проблема тайм-аута, но обычно, когда это происходит со мной, это происходит из-за сбоя (например, YARN убивает контейнер, потому что он работает за пределами pmem или vmem). Я бы рекомендовал тщательно проверить JobManager и все файлы журнала TaskManager.

...