Как уже говорилось, у вас есть 3 варианта здесь.
В моем примере я использую следующие 3 набора данных:
+----+----+----+
|col1|col2|col3|
+----+----+----+
|1 |100 |200 |
|2 |300 |400 |
+----+----+----+
+----+----+----+
|col1|col2|col3|
+----+----+----+
|3 |60 |80 |
|4 |12 |100 |
|5 |20 |10 |
+----+----+----+
+----+----+----+
|col1|col2|col3|
+----+----+----+
|7 |20 |40 |
|8 |30 |40 |
+----+----+----+
Вы сначала создаете свою схему (быстрее определить схему, а не выводить ее):
import org.apache.spark.sql.types._
val df_schema =
StructType(
List(
StructField("col1", IntegerType, true),
StructField("col2", IntegerType, true),
StructField("col3", IntegerType, true)))
Опция 1:
Загрузка всех CSV одновременно с помощью:
val df1 = spark
.read
.option("header", "false")
.option("delimiter", ",")
.option("inferSchema", "false")
.schema(df_schema)
.csv("file:///C:/data/*.csv")
Затем примените свою логику ко всей группе наборов данных по имени файла.
Условие : Вы должны найти способ добавить имя файла к каждому файлу
Опция 2:
Загрузка CSV-файлов из каталога.Затем переберите файлы и создайте информационный кадр для каждого CSV.Внутри цикла примените свою логику к каждому CSV.Наконец, в конце цикла добавьте (объедините) результаты во второй кадр данных, в котором будут храниться ваши накопленные результаты.
Внимание: Имейте в виду, что большое количество файлов может привести к очень большой DAG и, как следствие, к огромному плану выполнения, во избежание этого вы можете сохранить текущие результаты или сбор вызовов,В приведенном ниже примере я предполагал, что persist или collect будет выполняться для каждой итерации bufferSize.Вы можете настроить или даже удалить эту логику в соответствии с количеством CSV-файлов.
Это пример кода для 2nd option :
import java.io.File
import org.apache.spark.sql.Row
import spark.implicits._
val dir = "C:\\data_csv\\"
val csvFiles = new File(dir).listFiles.filter(_.getName.endsWith(".csv"))
val bufferSize = 10
var indx = 0
//create an empty df which will hold the accumulated results
var bigDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], df_schema)
csvFiles.foreach{ path =>
var tmp_df = spark
.read
.option("header", "false")
.option("delimiter", ",")
.option("inferSchema", "false")
.schema(df_schema)
.csv(path.getPath)
//execute your custom logic/calculations with tmp_df
if((indx + 1) % bufferSize == 0){
// If buffer size reached then
// 1. call unionDf.persist() or unionDf.collect()
// 2. in the case you use collect() load results into unionDf again
}
bigDf = bigDf.union(tmp_df)
indx = indx + 1
}
bigDf.show(false)
Это должно привести к выводу:
+----+----+----+
|col1|col2|col3|
+----+----+----+
|1 |100 |200 |
|2 |300 |400 |
|3 |60 |80 |
|4 |12 |100 |
|5 |20 |10 |
|7 |20 |40 |
|8 |30 |40 |
+----+----+----+
Вариант 3:
Последний вариант - использовать встроенную функцию spark.sparkContext.wholeTextFiles
.
Это код для загрузки всех файлов CSV в RDD:
val data = spark.sparkContext.wholeTextFiles("file:///C:/data_csv/*.csv")
val df = spark.createDataFrame(data)
df.show(false)
И вывод:
+--------------------------+--------------------------+
|_1 |_2 |
+--------------------------+--------------------------+
|file:/C:/data_csv/csv1.csv|1,100,200 |
| |2,300,400 |
|file:/C:/data_csv/csv2.csv|3,60,80 |
| |4,12,100 |
| |5,20,10 |
|file:/C:/data_csv/csv3.csv|7,20,40 |
| |8,30,40 |
+--------------------------+--------------------------+
spark.sparkContext.wholeTextFiles
вернет ключ / значениеСДР, в которой ключом является путь к файлу, а значением - данные файла.
Требуется дополнительный код для извлечения содержимого _2, которое является содержимым каждого CSV.По моему мнению, это может быть связано с дополнительными затратами на производительность и ремонтопригодность программы, поэтому я бы ее избежал.
Дайте мне знать, если вам нужны дополнительные разъяснения