Пожалуйста, проверьте код ниже.
1. Измените grouped
на par
функцию для параллельной загрузки данных.
2. Измените
// Below code will add same path for multiple files content.
paths.grouped(groupsNum.getOrElse(paths.length))
.map(group => group.map(pathToDataDF).reduceLeft(_ union _)
.withColumn("path", orderedPaths("path")))
на
// Below code will add same path for same file content.
paths
.grouped(groupsNum.getOrElse(paths.length))
.flatMap(group => {
group.map(path => {
pathToDataDF(path).withColumn("path", lit(path))
}
)
})
.reduceLeft(_ union _)
Например, я использовал оба par
и grouped
, чтобы показать вам.
Note
Игнорируйте некоторые методы как pathToDataDF
Я попытался воспроизвести ваши методы.
scala> val orderedPaths = Seq(("/tmp/data/foldera/foldera.json","2020-05-29 01:30:00"),("/tmp/data/folderb/folderb.json","2020-05-29 02:00:00"),("/tmp/data/folderc/folderc.json","2020-05-29 03:00:00")).toDF("path","time")
orderedPaths: org.apache.spark.sql.DataFrame = [path: string, time: string]
scala> def pathToDataDF(path: String) = spark.read.format("json").load(path)
pathToDataDF: (path: String)org.apache.spark.sql.DataFrame
//Sample File content I have taken.
scala> "cat /tmp/data/foldera/foldera.json".!
{"name":"Srinivas","age":29}
scala> "cat /tmp/data/folderb/folderb.json".!
{"name":"Ravi","age":20}
scala> "cat /tmp/data/folderc/folderc.json".!
{"name":"Raju","age":25}
Используя par
scala> val paths = orderedPaths.orderBy($"time").select($"path").as[String].collect
paths: Array[String] = Array(/tmp/data/foldera/foldera.json, /tmp/data/folderb/folderb.json, /tmp/data/folderc/folderc.json)
scala> val parDF = paths match {
case p if !p.isEmpty => {
p.par
.map(path => {
pathToDataDF(path)
.withColumn("path",lit(path))
}).reduceLeft(_ union _)
}
case _ => spark.emptyDataFrame
}
parDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]
scala> parDF.show(false)
+---+--------+------------------------------+
|age|name |path |
+---+--------+------------------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|
|20 |Ravi |/tmp/data/folderb/folderb.json|
|25 |Raju |/tmp/data/folderc/folderc.json|
+---+--------+------------------------------+
// With time column.
scala> val paths = orderedPaths.orderBy($"time").select($"path",$"time").as[(String,String)].collect
paths: Array[(String, String)] = Array((/tmp/data/foldera/foldera.json,2020-05-29 01:30:00), (/tmp/data/folderb/folderb.json,2020-05-29 02:00:00), (/tmp/data/folderc/folderc.json,2020-05-29 03:00:00))
scala> val parDF = paths match {
case p if !p.isEmpty => {
p.par
.map(path => {
pathToDataDF(path._1)
.withColumn("path",lit(path._1))
.withColumn("time",lit(path._2))
}).reduceLeft(_ union _)
}
case _ => spark.emptyDataFrame
}
parDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]
scala> parDF.show(false)
+---+--------+------------------------------+-------------------+
|age|name |path |time |
+---+--------+------------------------------+-------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|2020-05-29 01:30:00|
|20 |Ravi |/tmp/data/folderb/folderb.json|2020-05-29 02:00:00|
|25 |Raju |/tmp/data/folderc/folderc.json|2020-05-29 03:00:00|
+---+--------+------------------------------+-------------------+
Используя grouped
scala> val paths = orderedPaths.orderBy($"time").select($"path").as[String].collect
paths: Array[String] = Array(/tmp/data/foldera/foldera.json, /tmp/data/folderb/folderb.json, /tmp/data/folderc/folderc.json)
scala> val groupedDF = paths match {
case p if !p.isEmpty => {
paths
.grouped(groupsNum.getOrElse(paths.length))
.flatMap(group => {
group
.map(path => {
pathToDataDF(path)
.withColumn("path", lit(path))
})
}).reduceLeft(_ union _)
}
case _ => spark.emptyDataFrame
}
groupedDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]
scala> groupedDF.show(false)
+---+--------+------------------------------+
|age|name |path |
+---+--------+------------------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|
|20 |Ravi |/tmp/data/folderb/folderb.json|
|25 |Raju |/tmp/data/folderc/folderc.json|
+---+--------+------------------------------+
// with time column.
scala> val paths = orderedPaths.orderBy($"time").select($"path",$"time").as[(String,String)].collect
paths: Array[(String, String)] = Array((/tmp/data/foldera/foldera.json,2020-05-29 01:30:00), (/tmp/data/folderb/folderb.json,2020-05-29 02:00:00), (/tmp/data/folderc/folderc.json,2020-05-29 03:00:00))
scala> val groupedDF = paths match {
case p if !p.isEmpty => {
paths
.grouped(groupsNum.getOrElse(paths.length))
.flatMap(group => {
group
.map(path => {
pathToDataDF(path._1)
.withColumn("path",lit(path._1))
.withColumn("time",lit(path._2))
})
}).reduceLeft(_ union _)
}
case _ => spark.emptyDataFrame
}
groupedDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]
scala> groupedDF.show(false)
+---+--------+------------------------------+-------------------+
|age|name |path |time |
+---+--------+------------------------------+-------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|2020-05-29 01:30:00|
|20 |Ravi |/tmp/data/folderb/folderb.json|2020-05-29 02:00:00|
|25 |Raju |/tmp/data/folderc/folderc.json|2020-05-29 03:00:00|
+---+--------+------------------------------+-------------------+