Добавить дополнительные столбцы в фрейм данных Spark - PullRequest
0 голосов
/ 29 мая 2020

Я разбираю фрейм данных Spark, используя пути к файлам, но теперь я хотел бы добавить пути к результирующему фрейму данных вместе со временем в виде отдельного столбца. Вот текущее решение (pathToDF - вспомогательный метод):

val paths = pathsDF
  .orderBy($"time")
  .select($"path")
  .as[String]
  .collect()

if(paths.nonEmpty) {
  paths
    .grouped(groupsNum.getOrElse(paths.length))
    .map(_.map(pathToDF).reduceLeft(_ union _))
} else {
  Seq.empty[DataFrame]
}

Я пытаюсь сделать что-то подобное, но я не уверен, как добавить столбец времени, используя withColumn:

    val orderedPaths = pathsDF
      .orderBy($"time")
      .select($"path")
   //.select($"path", $"time") for both columns

    val paths = orderedPaths
      .as[String]
      .collect()

    if (paths.nonEmpty) {
      paths
        .grouped(groupsNum.getOrElse(paths.length))
        .map(group => group.map(pathToDataDF).reduceLeft(_ union _)
          .withColumn("path", orderedPaths("path")))
    //.withColumn("time", orderedPaths("time") something like this
    } else {
      Seq.empty[DataFrame]
    }

Как лучше реализовать это?

Входной DF:

time Long
path String

Текущий результат:

resultDF schema
field1 Int
field2 String
....
fieldN String

Ожидаемый результат:

resultDF schema
field1 Int
field2 String
....
path   String
time   Long

1 Ответ

1 голос
/ 29 мая 2020

Пожалуйста, проверьте код ниже.

1. Измените grouped на par функцию для параллельной загрузки данных.

2. Измените

// Below code will add same path for multiple files content.
paths.grouped(groupsNum.getOrElse(paths.length))
     .map(group => group.map(pathToDataDF).reduceLeft(_ union _)
     .withColumn("path", orderedPaths("path"))) 

на

// Below code will add same path for same file content.
paths
.grouped(groupsNum.getOrElse(paths.length))
.flatMap(group => {
    group.map(path => {
        pathToDataDF(path).withColumn("path", lit(path)) 
        }
    )
})
.reduceLeft(_ union _)

Например, я использовал оба par и grouped, чтобы показать вам.

Note Игнорируйте некоторые методы как pathToDataDF Я попытался воспроизвести ваши методы.

scala> val orderedPaths = Seq(("/tmp/data/foldera/foldera.json","2020-05-29 01:30:00"),("/tmp/data/folderb/folderb.json","2020-05-29 02:00:00"),("/tmp/data/folderc/folderc.json","2020-05-29 03:00:00")).toDF("path","time")
orderedPaths: org.apache.spark.sql.DataFrame = [path: string, time: string]

scala> def pathToDataDF(path: String) = spark.read.format("json").load(path)
pathToDataDF: (path: String)org.apache.spark.sql.DataFrame

//Sample File content I have taken.

scala> "cat /tmp/data/foldera/foldera.json".!
{"name":"Srinivas","age":29}

scala> "cat /tmp/data/folderb/folderb.json".!
{"name":"Ravi","age":20}

scala> "cat /tmp/data/folderc/folderc.json".!
{"name":"Raju","age":25}

Используя par

scala> val paths = orderedPaths.orderBy($"time").select($"path").as[String].collect
paths: Array[String] = Array(/tmp/data/foldera/foldera.json, /tmp/data/folderb/folderb.json, /tmp/data/folderc/folderc.json)

scala> val parDF = paths match {
        case p if !p.isEmpty => {
            p.par
            .map(path => { 
                pathToDataDF(path)
                .withColumn("path",lit(path))
            }).reduceLeft(_ union _)
        }
        case _ => spark.emptyDataFrame
    }
parDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]

scala> parDF.show(false)
+---+--------+------------------------------+
|age|name    |path                          |
+---+--------+------------------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|
|20 |Ravi    |/tmp/data/folderb/folderb.json|
|25 |Raju    |/tmp/data/folderc/folderc.json|
+---+--------+------------------------------+

// With time column.

scala> val paths = orderedPaths.orderBy($"time").select($"path",$"time").as[(String,String)].collect
paths: Array[(String, String)] = Array((/tmp/data/foldera/foldera.json,2020-05-29 01:30:00), (/tmp/data/folderb/folderb.json,2020-05-29 02:00:00), (/tmp/data/folderc/folderc.json,2020-05-29 03:00:00))

scala> val parDF = paths match {
            case p if !p.isEmpty => {
                p.par
                .map(path => {
                    pathToDataDF(path._1)
                    .withColumn("path",lit(path._1))
                    .withColumn("time",lit(path._2))
                }).reduceLeft(_ union _)
            }
            case _ => spark.emptyDataFrame
        }

parDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]

scala> parDF.show(false)
+---+--------+------------------------------+-------------------+
|age|name    |path                          |time               |
+---+--------+------------------------------+-------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|2020-05-29 01:30:00|
|20 |Ravi    |/tmp/data/folderb/folderb.json|2020-05-29 02:00:00|
|25 |Raju    |/tmp/data/folderc/folderc.json|2020-05-29 03:00:00|
+---+--------+------------------------------+-------------------+

Используя grouped

scala> val paths = orderedPaths.orderBy($"time").select($"path").as[String].collect
paths: Array[String] = Array(/tmp/data/foldera/foldera.json, /tmp/data/folderb/folderb.json, /tmp/data/folderc/folderc.json)

scala> val groupedDF = paths match {
            case p if !p.isEmpty => {
                paths
                .grouped(groupsNum.getOrElse(paths.length))
                .flatMap(group => {
                    group
                    .map(path => { 
                        pathToDataDF(path)
                        .withColumn("path", lit(path))
                    })
                }).reduceLeft(_ union _)
            }
            case _ => spark.emptyDataFrame
        }

groupedDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]

scala> groupedDF.show(false)

+---+--------+------------------------------+
|age|name    |path                          |
+---+--------+------------------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|
|20 |Ravi    |/tmp/data/folderb/folderb.json|
|25 |Raju    |/tmp/data/folderc/folderc.json|
+---+--------+------------------------------+

// with time column.

scala> val paths = orderedPaths.orderBy($"time").select($"path",$"time").as[(String,String)].collect
paths: Array[(String, String)] = Array((/tmp/data/foldera/foldera.json,2020-05-29 01:30:00), (/tmp/data/folderb/folderb.json,2020-05-29 02:00:00), (/tmp/data/folderc/folderc.json,2020-05-29 03:00:00))

scala> val groupedDF = paths match {
            case p if !p.isEmpty => {
                paths
                .grouped(groupsNum.getOrElse(paths.length))
                .flatMap(group => {
                    group
                    .map(path => {
                        pathToDataDF(path._1)
                        .withColumn("path",lit(path._1))
                        .withColumn("time",lit(path._2))
                    })
                }).reduceLeft(_ union _)
            }
            case _ => spark.emptyDataFrame
        }


groupedDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]

scala> groupedDF.show(false)
+---+--------+------------------------------+-------------------+
|age|name    |path                          |time               |
+---+--------+------------------------------+-------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|2020-05-29 01:30:00|
|20 |Ravi    |/tmp/data/folderb/folderb.json|2020-05-29 02:00:00|
|25 |Raju    |/tmp/data/folderc/folderc.json|2020-05-29 03:00:00|
+---+--------+------------------------------+-------------------+

...