org. apache .spark.shuffle.FetchFailedException: соединение с server1 / xxx.xxx.x.xxx: 7337 закрыто - PullRequest
1 голос
/ 14 июля 2020

Highlight
Я обновил Spark и пытаюсь запустить уже существующее приложение Spark Streaming (принимает имена файлов через поток, которые затем считываются из HDFS, преобразовываются с использованием операций rdd и dataframes, наконец, проанализированы данные set сохраняется в HBase) на YARN, который не работает и не может решить проблему.

Сведения о среде приведены ниже

Работа с версиями

Платформа
ОС: RHEL 6.6, 128 ГБ ОЗУ, 42 ТБ HDD, 32 ядра
Java: 1.8.0_25
Scala: 2.11
Had oop: 2.7.7
Spark: 2.4.6 с двоичными файлами Had oop 2.7
HBase: 1.4.12

Не работает после обновления

Spark: 3.0.0 с двоичными файлами Had oop 2.7
Скомпилирован тот же код с использованием Scala 2.12, как требуется для Spark 3.0.0, в который были внесены некоторые незначительные изменения в соответствии с изменением версии, без каких-либо логических изменений.

Обязательная пряжа конфигурации

<property>
  <name>yarn.nodemanager.aux-services</name>
  <value>mapreduce_shuffle,spark_shuffle</value>
</property>
<property>
  <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
  <value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
  <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
  <value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
  <name>spark.shuffle.service.port</name>
  <value>7337</value>
</property>

Конфигурация Spark передана при запуске задания

spark.app.name=Ingestion
spark.eventLog.enabled=true
spark.yarn.historyServer.address=${hadoopconf-yarn.resourcemanager.hostname}:18088
spark.eventLog.dir=hdfs:///user/hduser/applicationHistory
spark.submit.deployMode=cluster
spark.driver.memory=1GB
spark.driver.cores=1
spark.executor.memory=5GB
spark.executor.cores=5
spark.dynamicAllocation.enabled=true
spark.shuffle.service.enabled=true
spark.dynamicAllocation.minExecutors=1
spark.sql.shuffle.partitions=2001
spark.logging.level=INFO
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.yarn.archive=hdfs:///spark-3.0.0-bin-hadoop2.7-jars.zip
spark.ui.killEnabled=false
spark.driver.memoryOverhead=512
spark.executor.memoryOverhead=1024
spark.yarn.maxAppAttempts=4
spark.yarn.am.attemptFailuresValidityInterval=1h

Проблема
Тот же фрагмент кода работает с версиями Spark 2.4.4, 2.4.5, 2.4.6 с тем же набором конфигурации Had oop, YARN, Spark. Когда я обновляюсь до Spark 3.0.0, код начинает давать сбой с исключением ниже. Пробовали несколько настроек, например, увеличение ресурсов, уменьшение разделов и т. Д. c, но безуспешно. Проверили порт 7337 через te lnet, он также открыт и прослушивает. После недели отладки я не смог найти никакого решения для этого, и, похоже, нет причин для закрытия соединения порта в случайном порядке.
Работает с набором данных, который вряд ли составляет 50 МБ. Тот же код смог обработать более 300 МБ данных с точно такой же конфигурацией со Spark 2.4.x. Это странно !!

Исключение

org.apache.spark.shuffle.FetchFailedException: Connection from server1/xxx.xxx.x.xxx:7337 closed
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:748)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:663)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:70)
    at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:155)
    at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:78)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:129)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:107)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$Lambda$597/1323895653.apply(Unknown Source)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:859)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:859)
    at org.apache.spark.rdd.RDD$$Lambda$584/1207730390.apply(Unknown Source)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
    at org.apache.spark.executor.Executor$TaskRunner$$Lambda$421/1364680867.apply(Unknown Source)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Connection from server1/xxx.xxx.x.xxx:7337 closed
    at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146)
    at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    ... 1 more

Кто-нибудь еще сталкивался с этой проблемой? Если да, дайте мне знать, как вы ее решили. У меня нет никаких подсказок, что еще проверить. Любая помощь будет принята с благодарностью
Спасибо

1 Ответ

1 голос
/ 14 июля 2020

Spark 3.x не будет работать со старой службой перемешивания.

Попробуйте со следующим изменением конфигурации, если вы хотите сохранить старую службу перемешивания.

spark.shuffle.useOldFetchProtocol=true

Ссылка https://issues.apache.org/jira/browse/SPARK-29435

...