У меня есть inputDf , который мне нужно разделить на основе столбцов origin и destination и сохранить каждую уникальную комбинацию в другой файл csv.
(Использование Spark 2.4.4)
val spark: SparkSession = SparkSession.builder().appName("Test").getOrCreate()
val inputRdd: RDD[(String, String, String, String, String, String)] = spark.sparkContext.parallelize(Seq(
("City1", "City2", "Sedan", "AE1235", "80", "2020-02-01"),
("City2", "City3", "Hatchback", "XY5434", "100", "2020-02-01"),
("City3", "City1", "Sedan", "YU3456", "120", "2020-02-01"),
("City3", "City2", "Sedan", "BV3555", "105", "2020-02-01"),
("City2", "City1", "SUV", "PO1234", "75", "2020-02-01"),
("City1", "City3", "SUV", "TY4123", "125", "2020-02-01"),
("City1", "City2", "Hatchback", "VI3415", "85", "2020-02-01"),
("City1", "City2", "SUV", "VF1244", "84", "2020-02-01"),
("City3", "City1", "Sedan", "EW1248", "124", "2020-02-01"),
("City2", "City1", "Hatchback", "GE576", "82", "2020-02-01"),
("City3", "City2", "Sedan", "PK2144", "104", "2020-02-01"),
("City3", "City1", "Hatchback", "PJ1244", "118", "2020-02-01"),
("City3", "City2", "SUV", "WF0976", "98", "2020-02-01"),
("City1", "City2", "Sedan", "WE876", "78", "2020-02-01"),
("City2", "City1", "Hatchback", "AB5467", "80", "2020-02-01")
))
val inputDf = spark.createDataFrame(inputRdd).toDF("origin", "destination", "vehicleType", "uniqueId", "distanceTravelled", "date")
Пример вывода:
CSV-файл 1:
origin,destination,vehicleType,uniqueId,distanceTravelled,date
City1,City2,Sedan,AE1235,80,2020-02-01
City1,City2,Hatchback,VI3415,85,2020-02-01
City1,City2,SUV,VF1244,84,2020-02-01
City1,City2,Sedan,WE876,78,2020-02-01
CSV файл 2:
origin,destination,vehicleType,uniqueId,distanceTravelled,date
City3,City1,Sedan,YU3456,120,2020-02-01
City3,City1,Sedan,EW1248,124,2020-02-01
City3,City1,Hatchback,PJ1244,118,2020-02-01
csv файл 3:
origin,destination,vehicleType,uniqueId,distanceTravelled,date
City2,City1,SUV,PO1234,75,2020-02-01
City2,City1,Hatchback,GE576,82,2020-02-01
City2,City1,Hatchback,AB5467,80,2020-02-01
До сих пор я пытался получить уникальные комбинации в кортеж и затем использовать foreach для него, фильтруя каждый inputDf время сохранения отфильтрованного фрейма данных в csv
val tuple = inputDf.groupBy("origin","destination").count()
.select("origin","destination").rdd.map(r => (r(0),r(1))).collect
tuple.foreach(row => {
val origin = row._1
val destination = row._2
val dataToWrite = inputDf.filter(inputDf.col("origin").equalTo(origin) && inputDf.col("destination").equalTo(destination))
dataToWrite.repartition(1).write.mode("overwrite").format("csv").option("header", "true").save("/path/to/output/folder/" + origin + "-" + destination + ".csv")
})
Этот подход занимает много времени, поскольку включает в себя фильтрацию inputDf каждый раз, так как количество уникальных комбинаций довольно велико. Каков оптимальный способ сделать это?
РЕДАКТИРОВАТЬ: Каждый inputDf будет иметь данные только для одной даты.
Выходные данные должны содержать файлы на уровне даты.
Как:
/ output / City1-City2 / 2020-02-01.csv
/ output / City1-City2 / 2020-02-02.csv
/ output / City1-City2 / 2020-02-03.csv
/ output / City3-City1 / 2020-02-01.csv
/ output / City3-City1 / 2020-02-02.csv
... и т. Д.