Создать DataFrame / Dataset, используя заголовок и данные в двух разных каталогах - PullRequest
0 голосов
/ 06 июня 2018

Я получаю входной файл в формате CSV.Здесь я получаю две директории, у первой директории будет один файл с записью заголовка, а у второй директории будут файлы данных.Здесь я хочу создать Dataframe / Dataset.

Один из способов, который я могу сделать, - это создать класс дел и разделить файлы данных по разделителю, присоединить схему и создать dataFrame.

Что я смотрю, так это прочитайте заголовочный файл и файл данных и создайте dataFrame.Я видел решение, использующее блоки данных, но в моей организации есть ограничения на использование блоков данных, и ниже приведен код, с которым я сталкиваюсь.Можете ли вы помочь мне с решением без использования блоков данных.

val headersDF = sqlContext
  .read
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .load("path to headers.csv")

val schema = headersDF.schema

val dataDF = sqlContext
  .read
  .format("com.databricks.spark.csv")
  .schema(schema)
  .load("path to data.csv")

Ответы [ 2 ]

0 голосов
/ 06 июня 2018

Поскольку в вашем CSV-файле заголовка нет данных, нет смысла выводить из него схему.Так что просто получите имена полей, прочитав его.

val headerRDD = sc.parallelize(Seq(("Name,Age,Sal"))) //Assume this line is in your Header CSV
val header = headerRDD.flatMap(_.split(",")).collect

//headerRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[70] at parallelize at command-2903591155643047:1
//header: Array[String] = Array(Name, Age, Sal)     

Затем прочитайте файл данных CSV.Либо сопоставьте каждую строку с классом дел или кортежем.Преобразуйте данные в DataFrame, передав массив заголовков.

val dataRdd = sc.parallelize(Seq(("Tom,22,500000"),("Rick,40,1000000"))) //Assume these lines are in your data CSV file
val data = dataRdd.map(_.split(",")).map(x => (x(0),x(1).toInt,x(2).toDouble)).toDF(header: _*)

//dataRdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[72] at parallelize at command-2903591155643048:1
//data: org.apache.spark.sql.DataFrame = [Name: string, Age: int ... 1 more field]  

Результат:

data.show() 
+----+---+---------+
|Name|Age|      Sal|
+----+---+---------+
| Tom| 22| 500000.0|
|Rick| 40|1000000.0|
+----+---+---------+    
0 голосов
/ 06 июня 2018

Вы можете сделать это так

 val schema=spark
.read
.format("csv")
.option("header","true")
.option("delimiter",",")
.load("C:\\spark\\programs\\empheaders.csv")
.schema

val data=spark
.read
.format("csv")
.schema(schema)
.option("delimiter",",")
.load("C:\\spark\\programs\\empdata.csv")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...