Я пытаюсь проанализировать набор XML-файлов, используя Scala и Spark. Я получаю данные для 'n' фреймов данных из файлов (т. Е. Количество фреймов данных не меняется, меняется только количество файлов)
Я анализирую набор файлов XML и сохраняю данные в ListBuffer[ListBuffer[String]]
. Каждый из ListBuffer[String]
содержит данные для фрейма данных. например:
ListBuffer[
ListBuffer["1|2|3|4","5|6|7|8"],
ListBuffer["a|b|c|d","e|f|g|h"],
ListBuffer["q|w|e|r","w|x|y|z"]
]
Это создаст 3 кадра данных:
Dataframe1:
col1 col2 col3 col4
1 2 3 4
5 6 7 8
и аналогично другим 2 кадрам данных.
Я не могу напрямую преобразовать XML в Датафрейм, так какПеред созданием dataframe необходимо выполнить много пользовательских обработок.
Я преобразую ListBuffer в Dataframe, используя следующий код:
finalListBuffer.foreach{ data =>
columns = FunctionToReturnColumnsList()
val schema = StructType(columns.map(field => StructField(field, StringType, true)))
val dataRDD: RDD[Row] = sparkSess.sparkContext.parallelize(data.toStream.map(l => Row.fromSeq(l.split("|", -1))))
val df = sparkSess.createDataFrame(dataRDD, schema)
...
}
После этого шага некоторые операции выполняютсявыполняется на каждом фрейме данных (некоторые операции имеют зависимость между фреймами данных, поэтому я не могу просто обработать один фрейм данных, а затем записать), и, наконец, фреймы данных записываются с использованием следующего кода:
df.repartition(1).write.mode("Overwrite").option("multiline", "true").option("delimiter", "\u0017").csv(filename)
При выполнении этих шагов яполучаю 2 проблемы при большом размере входного файла:
1) Превышен лимит накладных расходов ГХ при создании кадра данных. (Шаг, в котором создается переменная dataRDD
)
2) Удар сердцебиенияошибка тайм-аута при записи df.
Как решить эти проблемы?
Я думал об использовании ListBuffer[RDD[String]]
изначально вместо ListBuffer[ListBuffer[String]]
Но может быть какмлюбой как 1 миллион файлов, и каждый файл может иметь до 10-20 записей для df. Что я делаю, так это перечисляю все файлы, обрабатываю каждый из них по одному и добавляю их результат в основной ListBuffer. Так что, если я использую RDD, мне придется использовать, union для каждого файла, и это может быть дорого. Что еще можно сделать?