Я запускаю запланированное (один раз в день) приложение Spark на AWS EMR, приложение является алгоритмом рекомендации на основе spark.ml.recommendation.ALS, данные расположены на AWS S3, приложение выводит рекомендации для группыпользователя.
Чтобы обеспечить надежную работу алгоритма итерации, я использовал функцию контрольной точки spark. Я устанавливаю папку контрольных точек на AWS S3.
Иногда все работает нормально. Но иногда приложению spark не удается найти файл в папке контрольных точек, даже если файл действительно существует. Я не знаю почему.
вот типичный журнал ошибок: 19/10/30 13:46:01 ПРЕДУПРЕЖДЕНИЕ TaskSetManager: Потерянное задание 5.0 на этапе 873.0 (TID 12169, ip-10-79-9-182.us-west-2.compute.internal, исполнитель 5): java.io.FileNotFoundException: такого файла или каталога нет: s3a: // имя-корзины / контрольная точка / 8f63442c-dd06-45d8-8e3a-ec30634b1a2f / rdd-2166 / part-00005 в org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus (S3AFileSystem.java:1642) в org.apache.hadoop.fs.s3a.S3AFileSystem.open (S3AFileSystem.java:521) или.apache.spark.rdd. RDD.scala: 324)
Я проверил, что в хранилище S3 есть s3a: // имя-корзины / контрольная точка / 8f63442c-dd06-45d8-8e3a-ec30634b1a2f / rdd-2166 / part-00005.
мои подробные шаги следующие:
- создать папку контрольных точек на s3;
- setCheckpointDir в только что созданной папке;
- алгоритм запуска;
- удалить папку контрольной точки для очистки.
вот мои коды scala:
//step 1
val pathString = "s3a://bucket-name/checkpoint"
val path = new Path(pathString)
val fileSystem = FileSystem.get(path.toUri, sparkContext.hadoopConfiguration)
fileSystem.mkdirs(path)
//step 2
sparkContext.setCheckpointDir(pathString)
//step 3
//... lots of data that not so relevant
val als = new ALS()
.setRank(10)
.setMaxIter(20)
.setUserCol("userId")
.setItemCol("clubId")
.setRatingCol("rating")
.setCheckpointInterval(10)
.setColdStartStrategy("drop")
.setPredictionCol("prediction")
//... another lots of data that not so relevant
//step 4
fileSystem.delete(path, recursive = true)