У меня есть вложенная структура каталогов, которая выглядит следующим образом:
folder
│
folder
├── a
│ ├── b
│ │ └──c
│ │ └── d
│ │ └── e
│ │ └── 20180603121311
│ └── f
│ └── g
│ └──h
│ └──i
│ └── 20180603121511
└── b
│
├── w
│ └── x
│ └── y
│ └── z
│ └── 20180603121411
└── l
└── m
└── n
└──o
└── 20180603121411
Даты в конце древовидной структуры - это имена файлов.Это текстовые файлы без расширений.Данные в каталоге накапливаются до 26 ГБ, но я запускаю программу по дате, а не по всем датам одновременно.Каждый файл очень маленький, в КБ.
Моя оригинальная программа использовала аргумент даты, например: 20180603, а затем прошла все каталоги и создала массив всех путей, содержащих эту дату.Затем я прочитал бы каждый файл и проанализировал этот файл, используя Spark.После прочтения этого файла я добавил бы к нему новый столбец с именем datetime, который разделяет путь и получает последний элемент в этом пути (который является datetime).Затем я добавил бы этот фрейм данных к большему фрейму данных, используя union.
Однако чтение и запись программы занимали очень много времени для некоторых дат.Как я могу настроить свою программу так, чтобы она могла читать и записывать данные?Обратите внимание, что когда я говорю «долго», я имею в виду, что для некоторых дат требуется больше дня (данные не настолько велики, чтобы занимать так много времени).
for (inputPath <- pathBuffer) {
if (!errorHandler.isFileEmpty(inputPath) && !errorHandler.hasIncorrectExtension(inputPath)) {
val df = parse(inputPath, schema)
globalDF = globalDF.union(df)
}
}
def parse(inputPath: String, schema: StructType): DataFrame = {
try {
var df = spark.sqlContext.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
df = spark.sqlContext.createDataFrame(spark.sparkContext.textFile(inputPath)
.map(x => parseSchema.getRow(x)), schema)
df = df.withColumn("datetime", lit(inputPath.split("/").last))
df
}
catch {
case ex: Exception =>
throw ex
}
}
pathBuffer - это ListBuffer, которыйсодержит все собранные пути, содержащие эту дату.