Объединить несколько фреймов данных, переданных в foreach, в один фрейм данных - Scala spark - PullRequest
0 голосов
/ 29 апреля 2020

У меня есть два CSV-файла, как показано ниже.

a.csv

ID,Name,Age,Subject
1,Arun,23,English
2,Melan,22,IT

b.csv 

ID,Name,Department_ID,Age,Subject
3,Kumar,004,21,Science
4,Sagar,008,20,IT

Как видите, эти файловые структуры различны. Я хочу только столбцы ID и Subject. Поэтому я перечисляю путь к файлам и выполняю следующие действия:

val cols = List("ID", "Subject")

val file_path = List("path to a.csv", "path to b.csv") 

file_path.foreach(path => {

      val df =
        spark
          .read
          .option( "header", "true" )
          .option( "delimiter", "," )
          .csv(path )
          .select(cols.head, cols.tail: _*)

      df.show()
      df.count()

    })

1-й кадр данных

## +---+--------+
## |ID|Subject  |
## +--+---------+
## | 1|  English|
## | 2|       IT|
## +--+---------+

2-й кадр данных

##+---+---------+
## |ID|Subject  |
## +--+---------+
## | 3|  Science|
## | 4|       IT|
## +--+---------+

Но мне нужен один кадр данных путем слияния эти два кадра данных. Как показано ниже,

## +---+--------+
## |ID|Subject  |
## +--+---------+
## | 1|  English|
## | 2|       IT|
## | 3|  Science|
## | 4|       IT|
## +--+---------+

Есть ли способ сделать это? Я не хочу записывать эти два кадра данных в файлы и читать их как один.

Спасибо.

Ответы [ 2 ]

1 голос
/ 29 апреля 2020

Используйте map & reduce вместо foreach метода для достижения этого.

Пожалуйста, проверьте ниже

scala> val dfr = spark.read.format("csv").option("header","true")
dfr: org.apache.spark.sql.DataFrameReader = org.apache.spark.sql.DataFrameReader@cd6ccda

scala> val paths = List("/tmp/data/da.csv","/tmp/data/db.csv")
paths: List[String] = List(/tmp/data/da.csv, /tmp/data/db.csv)

scala> val columns = List("id","subject").map(c => col(c))
columns: List[org.apache.spark.sql.Column] = List(id, subject)

scala> spark.time { paths.map(path => dfr.load(path).select(columns:_*)).reduce(_ union _).show(false) }
+---+-------+
|id |subject|
+---+-------+
|1  |English|
|2  |IT     |
|3  |Science|
|4  |IT     |
+---+-------+

Time taken: 247 ms

scala>

Edit Поскольку оба файла имеют разные схемы, загрузка все файлы сразу дадут вам неправильный результат, пожалуйста, проверьте ниже.

scala> val da = spark.read.option("header","true").csv("/tmp/data/da.csv")
da: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]

scala> da.show(false)
+---+-----+---+-------+
|id |name |age|subject|
+---+-----+---+-------+
|1  |Arun |23 |English|
|2  |Melan|22 |IT     |
+---+-----+---+-------+


scala> val db = spark.read.option("header","true").csv("/tmp/data/db.csv")
db: org.apache.spark.sql.DataFrame = [id: string, name: string ... 3 more fields]

scala> db.show(false)
+---+-----+-------------+---+-------+
|id |name |department_id|age|subject|
+---+-----+-------------+---+-------+
|3  |Kumar|004          |21 |Science|
|4  |Sagar|008          |20 |IT     |
+---+-----+-------------+---+-------+


scala> val paths = List("/tmp/data/da.csv","/tmp/data/db.csv")
paths: List[String] = List(/tmp/data/da.csv, /tmp/data/db.csv)

scala> val columns = List("id","subject").map(c => col(c))
columns: List[org.apache.spark.sql.Column] = List(id, subject)

scala> spark.read.option("header", "true" ).option("delimiter", "," ).csv(paths: _* ).select(columns:_*).show(false)
20/04/29 18:35:07 WARN CSVDataSource: CSV header does not conform to the schema.
 Header: id,
 Schema: id, subject
Expected: subject but found:
CSV file: file:///tmp/data/da.csv
+---+-------+
|id |subject|
+---+-------+
|3  |Science|
|4  |IT     |
|1  |null   |
|2  |null   |
+---+-------+

scala> spark.read.option("header", "true" ).option("delimiter", "," ).csv(paths: _* ).select("id","name").show(false) // common columns from both fiels - id,name
+---+-----+
|id |name |
+---+-----+
|3  |Kumar|
|4  |Sagar|
|1  |Arun |
|2  |Melan|
+---+-----+

scala> spark.read.option("header", "true" ).option("delimiter", "," ).csv(paths: _* ).select("id","name","age").show(false) // file-1 has - id,name,age, file-2 has - id,name,department_id,age , in this age came after department_id
20/04/29 18:43:53 WARN CSVDataSource: CSV header does not conform to the schema.
 Header: id, name, subject
 Schema: id, name, age
Expected: age but found: subject
CSV file: file:///tmp/data/da.csv
+---+-----+-------+
|id |name |age    |
+---+-----+-------+
|3  |Kumar|21     |
|4  |Sagar|20     |
|1  |Arun |English|
|2  |Melan|IT     |
+---+-----+-------+
0 голосов
/ 29 апреля 2020

Spark Dataframe имеет встроенную функцию загрузки из нескольких файлов одновременно. Я думаю, что вместо загрузки их по отдельности, а затем присоединения к ним, вы можете просто загрузить их в один вызов, как показано ниже.

object LoadJoinDataframe {

  def main(args: Array[String]): Unit = {
    val cols = List("ID", "Subject")

    val file_path = List("path to a.csv", "path to b.csv")


    val spark = Constant.getSparkSess
    val df = spark
      .read
      .option( "header", "true" )
      .option( "delimiter", "," )
      .csv(file_path: _* )
      .select(cols.head, cols.tail: _*)
    df.show()
    df.count()

  }

}

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...