Spark не может найти данные контрольных точек в HDFS после сбоя исполнителя - PullRequest
0 голосов
/ 28 августа 2018

Я передаю данные Кафки, как показано ниже:

final JavaPairDStream<String, Row> transformedMessages = 


    rtStream
                    .mapToPair(record -> new Tuple2<String, GenericDataModel>(record.key(), record.value()))                
                    .mapWithState(StateSpec.function(updateDataFunc).numPartitions(32)).stateSnapshots()                        
                    .foreachRDD(rdd -> {
                    --logic goes here
                    }); 

У меня есть четыре рабочих потока и несколько исполнителей для этого приложения, и я пытаюсь проверить отказоустойчивость Spark.

Поскольку мы используем mapWithState, spark создает контрольные точки для данных в HDFS, поэтому, если какой-либо исполнитель / работник выйдет из строя, мы сможем восстановить потерянные данные (данные, потерянные в мертвом исполнителе), и продолжить работу с оставшимися исполнителями / работниками. .

Так что я убиваю один из рабочих узлов, чтобы проверить, все ли работает приложение без сбоев, но вместо этого я получаю исключение FileNotFound в HDFS, как показано ниже:

Это немного странно, так как Spark проверял данные иногда в HDFS, почему он не может их найти. Очевидно, HDFS не удаляет какие-либо данные, так почему это исключение.

Или я что-то здесь упускаю?

[ERROR] 2018-08-21 13:07:24,067 org.apache.spark.streaming.scheduler.JobScheduler logError - Error running job streaming job 1534871220000 ms.2
                org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954-41a7-9b9d-4ec874bc86de/rdd-1005/part-00000
                java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954-41a7-9b9d-4ec874bc86de/rdd-1005/part-00000
                        at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
                at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
                at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
                at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
                at org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
                at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:273)
                at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:273)
                at scala.Option.map(Option.scala:146)
                at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:273)
                at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1615)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1626)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1625)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1625)
                at scala.collection.immutable.List.foreach(List.scala:381)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1625)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1623)

Дальнейшее обновление: Я обнаружил, что RDD, который Spark пытается найти в HDFS, уже удален процессом «ReliableRDDCheckpointData», и он создал новый RDD для данных контрольной точки. DAG как-то указывает на этот старый RDD. Если бы были какие-либо ссылки на эти данные, они не должны были быть удалены.

1 Ответ

0 голосов
/ 28 декабря 2018

Рассмотрим этот конвейер преобразования в потоке Spark:

rtStream
                    .mapToPair(record -> new Tuple2<String, GenericDataModel>(record.key(), record.value()))                
                    .mapWithState(StateSpec.function(updateDataFunc).numPartitions(32)).stateSnapshots()                        
                    .foreachRDD(rdd -> {
                      if(counter ==1){
                       --convert RDD to Dataset, and register it as a SQL table names "InitialDataTable"
                      } else
                       --convert RDD to Dataset, and register it as a SQL table names "ActualDataTable"


                    }); 

mapWithState связан с автоматической контрольной точкой данных состояния после каждого пакета, поэтому каждый «rdd» в указанном выше блоке «forEachRdd» является контрольной точкой, и при контрольной точке он перезаписывает предыдущую контрольную точку (поскольку, очевидно, последнее состояние должно оставаться в контрольно-пропускной пункт)

но давайте скажем, что если пользователь все еще использует rdd номер 1, так как в моем случае я регистрирую самый первый rdd в качестве другой таблицы, а каждый второй rdd - в качестве другой таблицы, он не должен быть перезаписан. (то же самое в Java, если что-то ссылается на ссылку на объект, этот объект не будет иметь право на сборку мусора)

Теперь, когда я пытаюсь получить доступ к таблице «InitialDataTable», очевидно, что «rdd», использованный для создания этой таблицы, больше не находится в памяти, поэтому он перейдет в HDFS, чтобы восстановить это из контрольной точки, и не найдет это там также, потому что это было перезаписано на следующем rdd, и приложение spark перестает указывать причину.

"org.apache.spark.SparkException: задание прервано из-за сбоя этапа: не удалось создать задачу: java.io.FileNotFoundException: файл не существует: hdfs: // mycluster / user / user1 / sparkCheckpointData / 2db59817-d954- 41a7-9b9d-4ec874bc86de / РДД-1005 / часть-00000"

Итак, чтобы решить эту проблему, мне пришлось явно указать контрольную точку первого rdd.

...