Spine Streaming Kinesis на EMR выдает «Ошибка при сохранении блока в Spark» - PullRequest
0 голосов
/ 15 сентября 2018

У нас есть потоковое приложение Spark (2.3.1), работающее через EMR (5.16), получающее данные от одного сегмента AWS Kinesis за последние 2 года.В последнее время (последние 2 месяца) в приложении возникают случайные ошибки.Мы попытались выполнить обновление до последней версии Spark и EMR, и ошибка не была решена.

Проблема
На ровном месте, в любой момент времени и времени безотказной работы и, по-видимому, нетиндикаторы в ОЗУ, ЦП, сети, Приемник прекращает отправку сообщений в Spark, и в журналах появляется эта ошибка:

18/09/11 18:20:00 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error while storing block into Spark - java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
at org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:210)
at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:158)
at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)
at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:133)
at org.apache.spark.streaming.kinesis.KinesisReceiver.org$apache$spark$streaming$kinesis$KinesisReceiver$$storeBlockWithRanges(KinesisReceiver.scala:306)
at org.apache.spark.streaming.kinesis.KinesisReceiver$GeneratedBlockHandler.onPushBlock(KinesisReceiver.scala:357)
at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)
at org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)
at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)

Приложение Spark продолжает работать без проблем, но не имеет записей дляprocess.

Единственное, что я нашел в интернете, это Github Issue https://github.com/awslabs/amazon-kinesis-client/issues/185

Что я пробовал
Окончательная версияконфигурации выглядит следующим образом

[
  {
    "Classification": "capacity-scheduler",
    "Properties": {
      "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
    }
  },
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.yarn.am.attemptFailuresValidityInterval": "1h",
      "spark.yarn.maxAppAttempts": "4",
      "spark.yarn.executor.failuresValidityInterval": "1h",
      "spark.task.maxFailures": "8",
      "spark.task.reaper.enabled": "true",
      "spark.task.reaper.killTimeout": "120s",
      "spark.metrics.conf": "metrics.properties",
      "spark.metrics.namespace": "spark",
      "spark.streaming.stopGracefullyOnShutdown": "true",
      "spark.streaming.receiver.writeAheadLog.enable": "true",
      "spark.streaming.receiver.writeAheadLog.closeFileAfterWrite": "true",
      "spark.streaming.driver.writeAheadLog.closeFileAfterWrite": "true",
      "spark.streaming.backpressure.enabled": "true",
      "spark.streaming.backpressure.initialRate": "500",
      "spark.dynamicAllocation.enabled": "false",
      "spark.default.parallelism": "18",
      "spark.driver.cores": "3",
      "spark.driver.memory": "4G",
      "spark.driver.memoryOverhead": "1024",
      "spark.driver.extraJavaOptions": "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedOops",
      "spark.executor.instances": "3",
      "spark.executor.cores": "3",
      "spark.executor.memory": "4G",
      "spark.executor.memoryOverhead": "1024",
      "spark.executor.heartbeatInterval": "20s",
      "spark.io.compression.codec": "zstd",
      "spark.rdd.compress": "true",
      "spark.executor.extraJavaOptions": "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedOops",
      "spark.python.worker.memory": "640m",
      "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": "2",
      "spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored": "true",
      "spark.network.timeout": "240s",
      "spark.locality.wait": "0s",
      "spark.eventLog.enabled": "false"
    }
  },
  {
    "Classification": "emrfs-site",
    "Properties": {
      "fs.s3.consistent.metadata.tableName": "HiddenName",
      "fs.s3.consistent": "true",
      "fs.s3.consistent.retryCount": "1",
      "fs.s3.consistent.retryPeriodSeconds": "2",
      "fs.s3.consistent.retryPolicyType": "fixed",
      "fs.s3.consistent.throwExceptionOnInconsistency": "false"
    }
  }
]

Ничего там не помогло.

Обходные пути Чтобы Receiver снова отправлял сообщения, мне нужно перейти в интерфейс Spark, ДжобсНажмите и убейте активную работу (я не могу загрузить изображения, потому что у меня недостаточно репутации).Это вызывает новое приложение, и Receiver снова начинает получать записи в Spark.Конечно, это не решение для производственной системы.

Программный обходной путь Я написал StreamingListener, который захватывает onReceiverStopped и onReceiverError.Когда это происходит, создается сигнал (внешний файл), который проверяется каждый раз при обработке ssc.awaitTerminationOrTimeout(check_interval).Если сигнал был создан, приложение вызывает исключение, и приложение уничтожается.Это создает новый appattempt и запускается новое потоковое приложение.Проблема с этим, во-первых, это то, что он не решает исходную проблему, а во-вторых, похоже, что после первого appattempt StreamingListener больше не зарегистрирован, поэтому я не могу восстановить приложение.

...