Как исправить проблемы с соединением при отправке тяжелых файлов в таблицу улья - PullRequest
3 голосов
/ 31 мая 2019

У меня есть файлы в формате hdf, которые я хочу поместить в таблицу Hive.Операция выполняется пакетом Spark в Java-приложении.Код, выполняющий задачу, выглядит следующим образом:

[...]

final Dataset<File> fileDs = rawDs.map(record -> {
        return FileService.map(record.getList(2));
      }, Encoders.bean(File.class));

final Dataset<Row> fileDsWithId = fileDs.withColumn("id", functions.lit(id));
fileDsWithId.repartition(fileDsWithId.col("id")); 

fileWithId.write().mode(SaveMode.Append)
  .format("orc")
  .partitionBy("id")
  .option("path", hdfs://..../mydatabase.db/mytable")
  .saveAsTable("mydatabase.mytable");

Когда я использую небольшой файл (1 или 2 строки данных), приложение работает нормально, задание успешно завершается в течение 30 секунд.Таблица создана в Hive, и я могу отображать данные с помощью запроса Select *.Это также работает, когда таблица уже существует.Данные просто добавляются к существующим.Структуры таблицы генерации в улье кажутся хорошими.Это соответствует моим данным.

Но когда я пытаюсь обработать больший файл (3.7Mo с примерно 1000 строками данных), работа завершается неудачно через 15 минут.Соответствующий файл orc создается в hdfs, но он пуст, и Hive не знает об этом.

В файле журнала показано несколько ошибок, подобных этим:

2019-05-31 14:20:07,500 - [ERROR] [                           dispatcher-event-loop-3] pache.spark.scheduler.cluster.YarnClusterScheduler - [{}] - Lost executor 31 on XXXXXX: Container marked as failed: container_e71_1559121287708_0019_02_000032 on host: XXXXXXXXX. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143.
Killed by external signal
[...]
java.lang.RuntimeException: java.io.IOException: Connection reset by peer
        at org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
        at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:273)
        at org.apache.spark.network.crypto.AuthClientBootstrap.doSparkAuth(AuthClientBootstrap.java:105)
        at org.apache.spark.network.crypto.AuthClientBootstrap.doBootstrap(AuthClientBootstrap.java:79)
...
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
...
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        ... 1 more
[...]       
        2019-05-31 14:20:17,898 - [ERROR] [                                shuffle-client-4-1]    org.apache.spark.network.client.TransportClient - [{}] - Failed to send RPC 9035939448873337359 to XXXXXXXX: java.nio.channels.ClosedChannelExceptionsg
java.nio.channels.ClosedChannelException
        at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown Source)
2019-05-31 14:20:17,899 - [ERROR] [          Executor task launch worker for task 244] apache.spark.network.client.TransportClientFactory - [{}] - Exception while bootstrapping client after 5999 mssg
java.lang.RuntimeException: java.io.IOException: Failed to send RPC 9035939448873337359 to XXXXXXXXX: java.nio.channels.ClosedChannelException
        at org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
        at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:273)
        at org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:70)
        at org.apache.spark.network.crypto.AuthClientBootstrap.doSaslAuth(AuthClientBootstrap.java:115)
     ...
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to send RPC 9035939448873337359 to XXXXXXXXXXXX: java.nio.channels.ClosedChannelException
        at org.apache.spark.network.client.TransportClient.lambda$sendRpc$2(TransportClient.java:237)
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481)
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
        at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:122)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:852)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:738)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1251)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:733)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:725)
        at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:35)
...
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        ... 1 more
[...]       
2019-05-31 14:20:22,907 - [INFO ] [                             Block Fetch Retry-6-1] .apache.spark.network.shuffle.RetryingBlockFetcher - [{}] - Retrying fetch (2/3) for 1 outstanding blocks after 5000 mssg
2019-05-31 14:20:27,909 - [ERROR] [                             Block Fetch Retry-6-2] .apache.spark.network.shuffle.RetryingBlockFetcher - [{}] - Exception while beginning fetch of 1 outstanding blocks (after 2 retries)sg
java.io.IOException: Failed to connect to XXXXXXXXX
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
...
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connexion refused: XXXXXXXX
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
...
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        ... 2 more
[...]       
        2019-05-31 14:20:32,915 - [WARN ] [          Executor task launch worker for task 244]              org.apache.spark.storage.BlockManager - [{}] - Failed to fetch remote block broadcast_2_piece0 from BlockManagerId(1, XXXXXXX, 44787, None) (failed attempt 1)sg
org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
        at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:105)
        at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:642)
...
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to connect to XXXXXXXXX
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
...
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        ... 1 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connexion refused: XXXXXXXX
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
...
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        ... 2 more

Я не понимаючто там происходит.Я проверил на наличие проблем с памятью, но, кажется, все в порядке.Эти машины используются для обработки файлов большего размера (обычно десятков гигабайт).Почему соединение потеряно / отказано / сброшено?Есть ли какие-либо проблемы с тем, что Spark заранее создает схему таблиц, которая могла бы объяснить это?


UPDATED after Ram Ghadiyaram's answer :
Я пытался установить spark.network.timeout в 6000 с.Никакой другой параметр тайм-аута не настроен в среде.Результат кажется одинаковым.Задание завершается ошибкой через 10 минут, показывая те же ошибки в файле журнала: «сброс соединения по пиру», «сбой при отправке RPC» и т. Д.

Настройка spark.core.connection.ack.wait.timeout, spark.storage.blockManagerSlaveTimeoutMs, spark.shuffle.io.connectionTimeout, spark.rpc.askTimeout и spark.rpc.lookupTimeout с одинаковыми значениями (6000 с), похоже, тоже не работают.

Я думаю, что мой набор данных слишком грязный, чтобы все равно обрабатываться должным образом.Я попытаюсь изменить модель данных и затем снова запустить приложение с этими настройками времени ожидания.


Updated 01/07/2019 :
Я упростил модель данных.Модель была сложной, что приводило к некоторой пустой структуре в наборе данных, потому что система не могла связать некоторые поля по наследству.Я сплющил структуру так, чтобы каждый возможный тип присутствовал как фактический атрибут универсального класса, и поэтому я удалил наследование.

Чтобы обернуть это, что-то вроде этого:
File.class
| -field1
| -field2
| -field3
| - GenericClass
| -Class1
| -Class2
| -Class3

Вместо абстрактногокласс с некоторыми дочерьми я сделал общий с другими классами в качестве атрибутов.Это довольно грязно (и я не рекомендую делать это), но набор данных был намного чище.

У меня больше не было проблем с таймаутом после того, как это изменение было выполнено.Я думаю, что предыдущая модель была слишком грязной, чтобы ее можно было эффективно написать Spark.

Я пробовал писать в формате ORC и Avro, и оба были в порядке.В Avro мне удалось написать около 300000 строк за минуту, поэтому настройки тайм-аута по умолчанию больше не являются проблемой.

1 Ответ

1 голос
/ 31 мая 2019

В: Почему соединение потеряно / запрещено / сброшено?

org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)

Похоже, что это типичная проблема тайм-аута для большой или большой рабочей нагрузки.

IНе знаю, какую версию свечи вы используете.но его здесь нет.в основном то, что он делает, это будет ждать некоторое время (тайм-аут по умолчанию), и он потерпит неудачу.см. ThreadUtils

/**
   * Preferred alternative to `Await.result()`.
   *
   * This method wraps and re-throws any exceptions thrown by the underlying `Await` call, ensuring
   * that this thread's stack trace appears in logs.
   *
   * In addition, it calls `Awaitable.result` directly to avoid using `ForkJoinPool`'s
   * `BlockingContext`. Codes running in the user's thread may be in a thread of Scala ForkJoinPool.
   * As concurrent executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this
   * method basically prevents ForkJoinPool from running other tasks in the current waiting thread.
   * In general, we should use this method because many places in Spark use [[ThreadLocal]] and it's
   * hard to debug when [[ThreadLocal]]s leak to other tasks.
   */
  @throws(classOf[SparkException])
  def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
    try {
      // `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
      // See SPARK-13747.
      val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
      awaitable.result(atMost)(awaitPermission)
    } catch {
      case e: SparkFatalException =>
        throw e.throwable
      // TimeoutException is thrown in the current thread, so not need to warp the exception.
      case NonFatal(t) if !t.isInstanceOf[TimeoutException] =>
        throw new SparkException("Exception thrown in awaitResult: ", t)
    }
  }

Вы должны увеличить тайм-ауты. См. сетевые документы

spark.network.timeout 120 с. Тайм-аут по умолчанию для всех сетевых взаимодействий.,Этот конфиг будет использоваться вместо spark.core.connection.ack.wait.timeout, spark.storage.blockManagerSlaveTimeoutMs, spark.shuffle.io.connectionTimeout, spark.rpc.askTimeout или spark.rpc.lookupTimeout, если они не настроены.


Для суммирования: Для небольших рабочих нагрузок достаточно времени ожидания, для больших рабочих нагрузок необходимо увеличить время ожидания.

...