Преобразование RDD в Dataframe - PullRequest
       10

Преобразование RDD в Dataframe

0 голосов
/ 24 сентября 2019

Я новичок в spark / scala.У меня есть созданный ниже RDD путем загрузки данных из нескольких путей.Теперь я хочу создать dataframe из того же для дальнейших операций.ниже должна быть схема dataframe

schema[UserId, EntityId, WebSessionId, ProductId]

rdd.foreach(println)

545456,5615615,DIKFH6545614561456,PR5454564656445454
875643,5485254,JHDSFJD543514KJKJ4
545456,5615615,DIKFH6545614561456,PR5454564656445454
545456,5615615,DIKFH6545614561456,PR5454564656445454
545456,5615615,DIKFH6545614561456,PR54545DSKJD541054
264264,3254564,MNXZCBMNABC5645SAD,PR5142545564542515
732543,8765984,UJHSG4240323545144
564574,6276832,KJDXSGFJFS2545DSAS

Кто-нибудь, пожалуйста, помогите мне .... !!!

Я попробовал то же самое, определив класс схемы и сопоставив его с rdd, но получая ошибку

"ArrayIndexOutOfBoundsException: 3"

1 Ответ

1 голос
/ 24 сентября 2019

Если вы рассматриваете ваши столбцы как String, вы можете создать следующее:

import org.apache.spark.sql.Row

val rdd : RDD[Row] = ???

val df = spark.createDataFrame(rdd, StructType(Seq(
  StructField("userId", StringType, false),
  StructField("EntityId", StringType, false),
  StructField("WebSessionId", StringType, false),
  StructField("ProductId", StringType, true))))

Обратите внимание, что вы должны "сопоставить" свой RDD с RDD [Row], чтобы компилятор разрешил использовать "метод createDataFrame ".Для пропущенных полей вы можете объявить столбцы как обнуляемые в схеме DataFrame.

В вашем примере вы используете метод RDD spark.sparkContext.textFile () .Этот метод возвращает RDD [String], что означает, что каждый элемент вашего RDD является строкой.Но вам нужен RDD [Row].Поэтому вам нужно разделить вашу строку запятыми, например:

val list = 
 List("545456,5615615,DIKFH6545614561456,PR5454564656445454",
   "875643,5485254,JHDSFJD543514KJKJ4", 
   "545456,5615615,DIKFH6545614561456,PR5454564656445454", 
   "545456,5615615,DIKFH6545614561456,PR5454564656445454", 
   "545456,5615615,DIKFH6545614561456,PR54545DSKJD541054", 
   "264264,3254564,MNXZCBMNABC5645SAD,PR5142545564542515", 
"732543,8765984,UJHSG4240323545144","564574,6276832,KJDXSGFJFS2545DSAS")


val FilterReadClicks = spark.sparkContext.parallelize(list)

val rows: RDD[Row] = FilterReadClicks.map(line => line.split(",")).map { arr =>
  val array = Row.fromSeq(arr.foldLeft(List[Any]())((a, b) => b :: a))
  if(array.length == 4) 
    array
  else Row.fromSeq(array.toSeq.:+(""))
}

rows.foreach(el => println(el.toSeq))

val df = spark.createDataFrame(rows, StructType(Seq(
  StructField("userId", StringType, false),
  StructField("EntityId", StringType, false),
  StructField("WebSessionId", StringType, false),
  StructField("ProductId", StringType, true))))

df.show()

+------------------+------------------+------------+---------+
|            userId|          EntityId|WebSessionId|ProductId|
+------------------+------------------+------------+---------+
|PR5454564656445454|DIKFH6545614561456|     5615615|   545456|
|JHDSFJD543514KJKJ4|           5485254|      875643|         |
|PR5454564656445454|DIKFH6545614561456|     5615615|   545456|
|PR5454564656445454|DIKFH6545614561456|     5615615|   545456|
|PR54545DSKJD541054|DIKFH6545614561456|     5615615|   545456|
|PR5142545564542515|MNXZCBMNABC5645SAD|     3254564|   264264|
|UJHSG4240323545144|           8765984|      732543|         |
|KJDXSGFJFS2545DSAS|           6276832|      564574|         |
+------------------+------------------+------------+---------+

С помощью строк rdd вы сможете создать кадр данных.

...