условный оператор с groupby на уровне искры rdd - scala - PullRequest
0 голосов
/ 26 ноября 2018

Я использую Spark 1.60 и Scala 2.10.5

У меня есть такой фрейм данных,

+------------------+
|id | needed       | 
+------------------+
|1  | 2            |                                                                                                                                    
|1  | 0            |
|1  | 3            |
|2  | 0            |
|2  | 0            |
|3  | 1            |
|3  | 2            |                                                                                                    
+------------------+

Из этого df я создал rdd вот так,

 val  dfRDD = df.rdd

из моего rdd, я хочу сгруппировать по id, а количество needed равно > 0.

((1, 2), (2,0), (3,2))

Итак, я попробовал вот так,

val groupedDF = dfRDD.map(x =>(x(0), x(1) > 0)).count.redueByKey(_+_)

В этом случае, Я получаю сообщение об ошибке:

ошибка: значение> не является членом какого-либо

Мне нужно это на уровне rdd.Любая помощь, чтобы получить желаемый результат, была бы отличной.

Ответы [ 2 ]

0 голосов
/ 26 ноября 2018

Проблема в том, что в вашем map вы вызываете метод apply из Row , и, как вы можете видеть в его scaladoc, этот метод возвращает Любой - и, как вы можете видеть, для ошибки и из scaladoc естьне такой метод < в любом

Вы можете исправить это с помощью метода getAs[T].

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

val spark =
  SparkSession
    .builder
    .master("local[*]")
    .getOrCreate()
import spark.implicits._

val df =
  List(
    (1, 2),
    (1, 0),
    (1, 3),
    (2, 0),
    (2, 0),
    (3, 1),
    (3, 2)
  ).toDF("id", "needed")

val rdd: RDD[(Int, Int)] = df.rdd.map(row => (row.getAs[Int](fieldName = "id"), row.getAs[Int](fieldName = "needed")))

Оттуда вы можете продолжить агрегацию, выесть несколько ошибок в вашей логике.
Во-первых, вам не нужен вызов count.
И, во-вторых, если вам нужно посчитать, сколько раз "needed" был больше, чем тот, который вы не можетесделайте _ + _, потому что это сумма необходимых значений.

val grouped: RDD[(Int, Int)] = rdd.reduceByKey { (acc, v) => if (v > 0) acc + 1 else acc }

val result: Array[(Int, Int)] = grouped.collect()
// Array((1,3), (2,0), (3,2))

PS: Вы должны сказать своему профессору перейти на Spark 2 и Scala 2.11;)

Edit

Использование case-классов в приведенном выше примере.

final case class Data(id: Int, needed: Int)
val rdd: RDD[Data] = df.as[Data].rdd
val grouped: RDD[(Int, Int)] = rdd.map(d => d.id -> d.needed).reduceByKey { (acc, v) => if (v > 0) acc + 1 else acc }  
val result: Array[(Int, Int)] = grouped.collect()
// Array((1,3), (2,0), (3,2))
0 голосов
/ 26 ноября 2018

Нет необходимости выполнять вычисления на уровне rdd.Агрегация с фреймом данных должна работать:

df.groupBy("id").agg(sum(($"needed" > 0).cast("int")).as("positiveCount")).show
+---+-------------+
| id|positiveCount|
+---+-------------+
|  1|            2|
|  3|            2|
|  2|            0|
+---+-------------+

Если вам нужно работать с RDD, используйте row.getInt или как ответ @Luis row.getAs[Int], чтобы получить значение с явным типом, а затем выполните командусравнение и reduceByKey:

df.rdd.map(r => (r.getInt(0), if (r.getInt(1) > 0) 1 else 0)).reduceByKey(_ + _).collect
// res18: Array[(Int, Int)] = Array((1,2), (2,0), (3,2))
...