Как решить, что «aggregateByKey не является членом org.apache.spark.sql.Dataset» в Spark? - PullRequest
0 голосов
/ 02 апреля 2019

Я пытаюсь этот пример:

https://backtobazics.com/big-data/spark/apache-spark-aggregatebykey-example/

Но вместо СДР я использую фрейм данных.

Я попробовал следующее:

val aggrRDD = student_df.map(r => (r.getString(0), (r.getString(1), r.getInt(2))))
                       .aggregateByKey(zeroVal)(seqOp, combOp) 

, который является частью этого фрагмента кода:



val student_df = sc.parallelize(Array(
    ("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91), ("Joseph", "Biology", 82), 
    ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62), ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), 
    ("Tina", "Maths", 78), ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87), 
    ("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91), ("Thomas", "Biology", 74), 
    ("Cory", "Maths", 56), ("Cory", "Physics", 65), ("Cory", "Chemistry", 71), ("Cory", "Biology", 68), 
    ("Jackeline", "Maths", 86), ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83), 
    ("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64), ("Juan", "Biology", 60)), 3).toDF("student", "subject", "marks")


def seqOp = (accumulator: Int, element: (String, Int)) => 
    if(accumulator > element._2) accumulator else element._2

def combOp = (accumulator1: Int, accumulator2: Int) => 
    if(accumulator1 > accumulator2) accumulator1 else accumulator2


val zeroVal = 0

val aggrRDD = student_df.map(r => (r.getString(0), (r.getString(1), r.getInt(2))))
                       .aggregateByKey(zeroVal)(seqOp, combOp) 


Это дает эту ошибку:

error: value aggregateByKey is not a member of org.apache.spark.sql.Dataset[(String, (String, Int))]

Возможной причиной может быть отсутствие точки с запятой до value aggregateByKey?

Что я здесь не так делаю? Как мне работать с фреймами данных или наборами данных?

1 Ответ

2 голосов
/ 02 апреля 2019

Попробуйте вызвать rdd после student_df и перед картой:

val aggrRDD = student_df.rdd.map(r => (r.getString(0), (r.getString(1), r.getInt(2))))
          .aggregateByKey(zeroVal)(seqOp, combOp)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...