Я передаю данные Кафки, как показано ниже:
final JavaPairDStream<String, Row> transformedMessages =
rtStream
.mapToPair(record -> new Tuple2<String, GenericDataModel>(record.key(), record.value()))
.mapWithState(StateSpec.function(updateDataFunc).numPartitions(32)).stateSnapshots()
.foreachRDD(rdd -> {
--logic goes here
});
У меня есть четыре рабочих потока и несколько исполнителей для этого приложения, и я пытаюсь проверить отказоустойчивость Spark.
Поскольку мы используем mapWithState, spark создает контрольные точки для данных в HDFS, поэтому, если какой-либо исполнитель / работник выйдет из строя, мы сможем восстановить потерянные данные (данные, потерянные в мертвом исполнителе), и продолжить работу с оставшимися исполнителями / работниками. .
Так что я убиваю один из рабочих узлов, чтобы проверить, все ли работает приложение без сбоев, но вместо этого я получаю исключение FileNotFound в HDFS, как показано ниже:
Это немного странно, так как Spark проверял данные иногда в HDFS, почему он не может их найти. Очевидно, HDFS не удаляет какие-либо данные, так почему это исключение.
Или я что-то здесь упускаю?
[ERROR] 2018-08-21 13:07:24,067 org.apache.spark.streaming.scheduler.JobScheduler logError - Error running job streaming job 1534871220000 ms.2
org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954-41a7-9b9d-4ec874bc86de/rdd-1005/part-00000
java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954-41a7-9b9d-4ec874bc86de/rdd-1005/part-00000
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
at org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:273)
at scala.Option.map(Option.scala:146)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:273)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1615)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1626)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1625)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1625)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1625)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1623)
Дальнейшее обновление:
Я обнаружил, что RDD, который Spark пытается найти в HDFS, уже удален процессом «ReliableRDDCheckpointData», и он создал новый RDD для данных контрольной точки.
DAG как-то указывает на этот старый RDD. Если бы были какие-либо ссылки на эти данные, они не должны были быть удалены.