У нас есть потоковое приложение Spark (2.3.1), работающее через EMR (5.16), получающее данные от одного сегмента AWS Kinesis за последние 2 года.В последнее время (последние 2 месяца) в приложении возникают случайные ошибки.Мы попытались выполнить обновление до последней версии Spark и EMR, и ошибка не была решена.
Проблема
На ровном месте, в любой момент времени и времени безотказной работы и, по-видимому, нетиндикаторы в ОЗУ, ЦП, сети, Приемник прекращает отправку сообщений в Spark, и в журналах появляется эта ошибка:
18/09/11 18:20:00 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error while storing block into Spark - java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
at org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:210)
at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:158)
at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)
at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:133)
at org.apache.spark.streaming.kinesis.KinesisReceiver.org$apache$spark$streaming$kinesis$KinesisReceiver$$storeBlockWithRanges(KinesisReceiver.scala:306)
at org.apache.spark.streaming.kinesis.KinesisReceiver$GeneratedBlockHandler.onPushBlock(KinesisReceiver.scala:357)
at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)
at org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)
at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)
Приложение Spark продолжает работать без проблем, но не имеет записей дляprocess.
Единственное, что я нашел в интернете, это Github Issue https://github.com/awslabs/amazon-kinesis-client/issues/185
Что я пробовал
Окончательная версияконфигурации выглядит следующим образом
[
{
"Classification": "capacity-scheduler",
"Properties": {
"yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
}
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.yarn.am.attemptFailuresValidityInterval": "1h",
"spark.yarn.maxAppAttempts": "4",
"spark.yarn.executor.failuresValidityInterval": "1h",
"spark.task.maxFailures": "8",
"spark.task.reaper.enabled": "true",
"spark.task.reaper.killTimeout": "120s",
"spark.metrics.conf": "metrics.properties",
"spark.metrics.namespace": "spark",
"spark.streaming.stopGracefullyOnShutdown": "true",
"spark.streaming.receiver.writeAheadLog.enable": "true",
"spark.streaming.receiver.writeAheadLog.closeFileAfterWrite": "true",
"spark.streaming.driver.writeAheadLog.closeFileAfterWrite": "true",
"spark.streaming.backpressure.enabled": "true",
"spark.streaming.backpressure.initialRate": "500",
"spark.dynamicAllocation.enabled": "false",
"spark.default.parallelism": "18",
"spark.driver.cores": "3",
"spark.driver.memory": "4G",
"spark.driver.memoryOverhead": "1024",
"spark.driver.extraJavaOptions": "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedOops",
"spark.executor.instances": "3",
"spark.executor.cores": "3",
"spark.executor.memory": "4G",
"spark.executor.memoryOverhead": "1024",
"spark.executor.heartbeatInterval": "20s",
"spark.io.compression.codec": "zstd",
"spark.rdd.compress": "true",
"spark.executor.extraJavaOptions": "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedOops",
"spark.python.worker.memory": "640m",
"spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": "2",
"spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored": "true",
"spark.network.timeout": "240s",
"spark.locality.wait": "0s",
"spark.eventLog.enabled": "false"
}
},
{
"Classification": "emrfs-site",
"Properties": {
"fs.s3.consistent.metadata.tableName": "HiddenName",
"fs.s3.consistent": "true",
"fs.s3.consistent.retryCount": "1",
"fs.s3.consistent.retryPeriodSeconds": "2",
"fs.s3.consistent.retryPolicyType": "fixed",
"fs.s3.consistent.throwExceptionOnInconsistency": "false"
}
}
]
Ничего там не помогло.
Обходные пути Чтобы Receiver снова отправлял сообщения, мне нужно перейти в интерфейс Spark, ДжобсНажмите и убейте активную работу (я не могу загрузить изображения, потому что у меня недостаточно репутации).Это вызывает новое приложение, и Receiver снова начинает получать записи в Spark.Конечно, это не решение для производственной системы.
Программный обходной путь Я написал StreamingListener, который захватывает onReceiverStopped
и onReceiverError
.Когда это происходит, создается сигнал (внешний файл), который проверяется каждый раз при обработке ssc.awaitTerminationOrTimeout(check_interval)
.Если сигнал был создан, приложение вызывает исключение, и приложение уничтожается.Это создает новый appattempt
и запускается новое потоковое приложение.Проблема с этим, во-первых, это то, что он не решает исходную проблему, а во-вторых, похоже, что после первого appattempt
StreamingListener больше не зарегистрирован, поэтому я не могу восстановить приложение.