[Случайно появляется] [Spark ML ALS] [AWS EMR] FileNotFoundException в папке контрольных точек, но файл существует - PullRequest
0 голосов
/ 31 октября 2019

Я запускаю запланированное (один раз в день) приложение Spark на AWS EMR, приложение является алгоритмом рекомендации на основе spark.ml.recommendation.ALS, данные расположены на AWS S3, приложение выводит рекомендации для группыпользователя.

Чтобы обеспечить надежную работу алгоритма итерации, я использовал функцию контрольной точки spark. Я устанавливаю папку контрольных точек на AWS S3.

Иногда все работает нормально. Но иногда приложению spark не удается найти файл в папке контрольных точек, даже если файл действительно существует. Я не знаю почему.

вот типичный журнал ошибок: 19/10/30 13:46:01 ПРЕДУПРЕЖДЕНИЕ TaskSetManager: Потерянное задание 5.0 на этапе 873.0 (TID 12169, ip-10-79-9-182.us-west-2.compute.internal, исполнитель 5): java.io.FileNotFoundException: такого файла или каталога нет: s3a: // имя-корзины / контрольная точка / 8f63442c-dd06-45d8-8e3a-ec30634b1a2f / rdd-2166 / part-00005 в org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus (S3AFileSystem.java:1642) в org.apache.hadoop.fs.s3a.S3AFileSystem.open (S3AFileSystem.java:521) или.apache.spark.rdd. RDD.scala: 324)

Я проверил, что в хранилище S3 есть s3a: // имя-корзины / контрольная точка / 8f63442c-dd06-45d8-8e3a-ec30634b1a2f / rdd-2166 / part-00005.

мои подробные шаги следующие:

  1. создать папку контрольных точек на s3;
  2. setCheckpointDir в только что созданной папке;
  3. алгоритм запуска;
  4. удалить папку контрольной точки для очистки.

вот мои коды scala:

//step 1
val pathString = "s3a://bucket-name/checkpoint"
val path = new Path(pathString)
val fileSystem = FileSystem.get(path.toUri, sparkContext.hadoopConfiguration)
fileSystem.mkdirs(path)

//step 2
sparkContext.setCheckpointDir(pathString)

//step 3
//... lots of data that not so relevant
val als = new ALS()
      .setRank(10)
      .setMaxIter(20)
      .setUserCol("userId")
      .setItemCol("clubId")
      .setRatingCol("rating")
      .setCheckpointInterval(10)
      .setColdStartStrategy("drop")
      .setPredictionCol("prediction")
//... another lots of data that not so relevant

//step 4
fileSystem.delete(path, recursive = true)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...