Создать Dataframe из списка кортежей в Scala - PullRequest
0 голосов
/ 02 мая 2019

Я пытался создать Dataframe из списка кортежей в Scala, но у меня возникли проблемы.У меня есть мой список кортежей, таких как:

`mylist` = List((17988,2), (17988,54), (17988,41), (17988,1))

Это схема, которую я определил для двух столбцов:

 val `outputSchema` =
              StructType(
                List(
                  StructField("SAILORID", StringType, nullable = false),
                  StructField("ACTIVITYID", StringType, nullable = true)))

Я попробовал приведенный ниже код, но он выдает исключение нулевого указателя.

val df = spark.createDataFrame(mylist, outputSchema);

Я также попробовал приведенное ниже, но та же проблема с нулевым указателем:

val rdd = spark.sparkContext.parallelize(mylist) 
val df = rdd.toDF("name", "list")
df.show()

mylist = List((17988,2), (17988,54), (17988,41), (17988,1))

Это моя схема для Dataframe:

 val outputSchema =StructType(List(StructField("SAILORID", StringType, nullable = false),StructField("ACTIVITYID", StringType, nullable = true)))

val df = spark.createDataFrame(mylist, outputSchema);

Я хочу получить результатв Dataframe из каждого кортежа в одной строке в приведенном выше списке.У меня должно быть 4 ряда из 4-х кортежей вроде:

------------------------
| SAILORID | ACTIVITYID |
|----------|------------|
| 17988    |   2        |
| 17988    |   54       |
| 17988    |   41       |
| 17988    |    1       |
-------------------------

Ответы [ 2 ]

0 голосов
/ 03 мая 2019

Я запустил ваш код и получил правильный вывод.

с зависимостями:

scalaVersion := "2.11.12"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.1.0",
  "org.apache.spark" %% "spark-sql" % "2.1.0"
)

Код:

val myList = List((17988, 2), (17988, 54), (17988, 41), (17988,1))
val rdd = spark.sparkContext.parallelize(myList)
val df = rdd.toDF("name", "list")
df.show()

и результат:

+-----+----+
| name|list|
+-----+----+
|17988|   2|
|17988|  54|
|17988|  41|
|17988|   1|
+-----+----+
0 голосов
/ 03 мая 2019

Я не уверен, поможет ли это, но я скопировал и вставил ваш пример, и кажется, что он работал для меня.

Какую версию spark вы используете?Мои версии:

  • Spark: 2.4.1 и
  • Scala: 2.11.12

Вот стенограмма моей сессии:

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> val myList = List((17988,2), (17988,54), (17988,41), (17988,1))
myList: List[(Int, Int)] = List((17988,2), (17988,54), (17988,41), (17988,1))

scala> val rdd = spark.sparkContext.parallelize(myList)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:28

scala> val df = rdd.toDF("name","list")
df: org.apache.spark.sql.DataFrame = [name: int, list: int]

scala> df.show
+-----+----+
| name|list|
+-----+----+
|17988|   2|
|17988|  54|
|17988|  41|
|17988|   1|
+-----+----+


scala> 

Можете ли вы попробовать выполнить операторы из моего транскрипта и ответить тем, что вы видите?

Кроме того, я не вижу возможности создать createDataFrame с такой подписью, которая соответствует этому вызову:

spark.createDataFrame(myList, outputSchema)

Единственный способ, который я вижу, который принимает список, а схема требует, чтобы список был java.util.List [org.apache.spark.sql.Row].

Вот список доступных методов:

def createDataFrame(rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType): org.apache.spark.sql.DataFrame            
def createDataFrame(rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType): org.apache.spark.sql.DataFrame                        
def createDataFrame(rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_]): org.apache.spark.sql.DataFrame                                                       
def createDataFrame(rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType): org.apache.spark.sql.DataFrame   
def createDataFrame(rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_]): org.apache.spark.sql.DataFrame                                                                
def createDataFrame[A <: Product](data: Seq[A])(implicit evidence$3: reflect.runtime.universe.TypeTag[A]): org.apache.spark.sql.DataFrame                                
def createDataFrame(data: java.util.List[_],beanClass: Class[_]): org.apache.spark.sql.DataFrame                                                                         
def createDataFrame[A <: Product](rdd: org.apache.spark.rdd.RDD[A])(implicit evidence$2: reflect.runtime.universe.TypeTag[A]): org.apache.spark.sql.DataFrame            
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...