Spark: Как получить последний файл с s3 за последние 10 дней - PullRequest
1 голос
/ 07 мая 2019

Я пытаюсь получить последний файл из s3 за последние 10 дней, когда на входе нет файла.Проблема в том, что путь содержит дату.

Мой путь такой:

val path = "s3://bucket-info/folder1/folder2"

val date = "2019/04/12"    ## YYYY/MM/DD

Я делаю это =

 val update_path = path+"/" +date //this will become s3://bucket-info/folder1/folder2/2019/04/12 


def fileExist(path: String, sc: SparkContext): Boolean = FileSystem.get(getS3OrFileUri(path),
  sc.hadoopConfiguration).exists(new Path(path + "/_SUCCESS"))


if (fileExist(update_path, sc)) {
    //read and process the file

} else {
       log("File not exist")
       // I need to get the latest file in the last five days and use. So that I can check "s3://bucket-info/folder1/folder2/2019/04/11" , s3://bucket-info/folder1/folder2/2019/04/10 and others. If no latest file in last 5 days. throw error. s

}

Но моя проблема в том, как мне проверить, когда наступает конецмесяц?Я могу сделать это для цикла, но есть ли оптимизированный и элегантный способ сделать это в искре?

1 Ответ

1 голос
/ 07 мая 2019

Не очень оптимально, но если вы хотите использовать Spark, программа чтения фреймов данных может использовать несколько путей, а input_file_name дает вам путь:

val path = "s3://bucket-info/folder1/folder2"
val date = "2019/04/12"
val fmt = DateTimeFormatter.ofPattern("yyyy/MM/dd")
val end = LocalDate.parse(date, fmt)
val prefixes = (0 until 10).map(end.minusDays(_)).map(d => s"$path/${fmt.format(d)}")

val prefix = spark.read
  .textFile(prefixes:_*)
  .select(input_file_name() as "file")
  .distinct()
  .orderBy(desc("file"))
  .limit(1)
  .collect().collectFirst {
  case Row(prefix: String) => prefix
}

prefix.fold {
  // log error
}
{ path =>
  //read and process the file
}

Это довольно неэффективно, и нет ясного путивокруг этого использование Spark в качестве реализации файловой системы S3 Hadoop не очень эффективно с использованием рекурсивных структур.Если вы хотите использовать S3 API напрямую, вы можете установить s"$path/${fmt.format(end.minusDays(10))}" в качестве start после параметра и использовать что-то вроде , например, для вывода списка ключей.Это работает, поскольку S3 всегда возвращает списки клавиш, отсортированные в алфавитном порядке, и у вас есть нулевое заполнение в ключах даты.

...