Чтение файлов в структуре вложенных каталогов - PullRequest
0 голосов
/ 16 октября 2018

У меня есть вложенная структура каталогов, которая выглядит следующим образом:

folder
 │
 folder
 ├── a
 │   ├── b
 │   │   └──c
 │   │      └── d
 │   │          └── e
 │   │              └── 20180603121311
 │   └── f
 │       └── g
 │           └──h
 │              └──i
 │                 └── 20180603121511
 └── b     
     │   
     ├── w
     │   └── x
     │       └── y 
     │           └── z 
     │               └── 20180603121411
     └── l
         └── m 
             └── n 
                 └──o 
                    └── 20180603121411

Даты в конце древовидной структуры - это имена файлов.Это текстовые файлы без расширений.Данные в каталоге накапливаются до 26 ГБ, но я запускаю программу по дате, а не по всем датам одновременно.Каждый файл очень маленький, в КБ.

Моя оригинальная программа использовала аргумент даты, например: 20180603, а затем прошла все каталоги и создала массив всех путей, содержащих эту дату.Затем я прочитал бы каждый файл и проанализировал этот файл, используя Spark.После прочтения этого файла я добавил бы к нему новый столбец с именем datetime, который разделяет путь и получает последний элемент в этом пути (который является datetime).Затем я добавил бы этот фрейм данных к большему фрейму данных, используя union.

Однако чтение и запись программы занимали очень много времени для некоторых дат.Как я могу настроить свою программу так, чтобы она могла читать и записывать данные?Обратите внимание, что когда я говорю «долго», я имею в виду, что для некоторых дат требуется больше дня (данные не настолько велики, чтобы занимать так много времени).

  for (inputPath <- pathBuffer) {
if (!errorHandler.isFileEmpty(inputPath) && !errorHandler.hasIncorrectExtension(inputPath)) {

  val df = parse(inputPath, schema)

    globalDF = globalDF.union(df)
 }
}

  def parse(inputPath: String, schema: StructType): DataFrame = {
    try {
      var df = spark.sqlContext.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)

      df = spark.sqlContext.createDataFrame(spark.sparkContext.textFile(inputPath)
        .map(x => parseSchema.getRow(x)), schema)

      df = df.withColumn("datetime", lit(inputPath.split("/").last))

      df
    }

    catch {
      case ex: Exception =>
        throw ex
    }
  }

pathBuffer - это ListBuffer, которыйсодержит все собранные пути, содержащие эту дату.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...