Spark & ​​hbase: java.io.IOException: сброс соединения по пиру - PullRequest
0 голосов
/ 28 апреля 2018

Буду признателен, если вы поможете мне.

Во время реализации потоковой передачи искры от kafka к hbase (код прилагается) мы столкнулись с проблемой «java.io.IOException: сброс соединения по пиру» (полный журнал прилагается).

Эта проблема возникает, если мы работаем с hbase, и в настройках spark включена опция динамического выделения. Если мы записываем данные в hdfs (таблицу кустов) вместо hbase или если опция динамического размещения отключена, ошибки не обнаруживаются.

Мы попытались изменить соединения zookeeper, время простоя spark executor, время ожидания сети. Мы попытались изменить службу передачи блоков в случайном порядке (NIO), но ошибка все еще существует. Если мы установим минимальное / максимальное количество исполнителей (менее 80) для динамического размещения, то проблем тоже не будет.

В чем может быть проблема? В Jira и переполнении стека много почти одинаковых проблем, но ничего не помогает.

Версия:

HBase 1.2.0-cdh5.14.0
Kafka  3.0.0-1.3.0.0.p0.40
SPARK2 2.2.0.cloudera2-1.cdh5.12.0.p0.232957
hbase-client/hbase-spark(org.apache.hbase) 1.2.0-cdh5.11.1

Настройки Spark:

--num-executors=80
--conf spark.sql.shuffle.partitions=200
--conf spark.driver.memory=32g
--conf spark.executor.memory=32g
--conf spark.executor.cores=4

Кластер: 1 + 8 узлов, 70 ЦП, 755 ГБ ОЗУ, жесткий диск x10,

Вход:

    18/04/09 13:51:56 INFO cluster.YarnClusterScheduler: Executor 717 on lang32.ca.sbrf.ru killed by driver.
18/04/09 13:51:56 INFO storage.BlockManagerMaster: Removed 717 successfully in removeExecutor
18/04/09 13:51:56 INFO spark.ExecutorAllocationManager: Existing executor 717 has been removed (new total is 26)
18/04/09 13:51:56 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 705.
18/04/09 13:51:56 INFO scheduler.DAGScheduler: Executor lost: 705 (epoch 45)
18/04/09 13:51:56 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 705 from BlockManagerMaster.
18/04/09 13:51:56 INFO cluster.YarnClusterScheduler: Executor 705 on lang32.ca.sbrf.ru killed by driver.
18/04/09 13:51:56 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(705, lang32.ca.sbrf.ru, 22805, None)
18/04/09 13:51:56 INFO spark.ExecutorAllocationManager: Existing executor 705 has been removed (new total is 25)
18/04/09 13:51:56 INFO storage.BlockManagerMaster: Removed 705 successfully in removeExecutor
18/04/09 13:51:56 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 716.
18/04/09 13:51:56 INFO scheduler.DAGScheduler: Executor lost: 716 (epoch 45)
18/04/09 13:51:56 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 716 from BlockManagerMaster.
18/04/09 13:51:56 INFO cluster.YarnClusterScheduler: Executor 716 on lang32.ca.sbrf.ru killed by driver.
18/04/09 13:51:56 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(716, lang32.ca.sbrf.ru, 28678, None)
18/04/09 13:51:56 INFO spark.ExecutorAllocationManager: Existing executor 716 has been removed (new total is 24)
18/04/09 13:51:56 INFO storage.BlockManagerMaster: Removed 716 successfully in removeExecutor
18/04/09 13:51:56 WARN server.TransportChannelHandler: Exception in connection from /10.116.173.65:57542
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:748)
18/04/09 13:51:56 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from /10.116.173.65:57542 is closed
18/04/09 13:51:56 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 548.

Ответы [ 2 ]

0 голосов
/ 29 апреля 2018

Попробуйте установить эти два параметра. Также попробуйте кэширование Dataframe перед записью в HBase.

spark.network.timeout

spark.executor.heartbeatInterval
0 голосов
/ 29 апреля 2018

Пожалуйста, смотрите мой соответствующий ответ здесь: Каковы возможные причины получения TimeoutException: фьючерсы истекли через [n секунд] при работе со Spark

Мне также понадобилось некоторое время, чтобы понять, почему Клодера заявляет следующее:

Динамическое распределение и Spark Streaming

Если вы используете Spark Streaming, Cloudera рекомендует отключить динамическое выделение, установив для spark.dynamicAllocation.enabled значение false при запуске потоковых приложений.

Ссылка: https://www.cloudera.com/documentation/spark2/latest/topics/spark2_known_issues.html#ki_dynamic_allocation_streaming

...