Проблема в том, что в вашем map
вы вызываете метод apply
из Row , и, как вы можете видеть в его scaladoc, этот метод возвращает Любой - и, как вы можете видеть, для ошибки и из scaladoc естьне такой метод <
в любом
Вы можете исправить это с помощью метода getAs[T]
.
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
val spark =
SparkSession
.builder
.master("local[*]")
.getOrCreate()
import spark.implicits._
val df =
List(
(1, 2),
(1, 0),
(1, 3),
(2, 0),
(2, 0),
(3, 1),
(3, 2)
).toDF("id", "needed")
val rdd: RDD[(Int, Int)] = df.rdd.map(row => (row.getAs[Int](fieldName = "id"), row.getAs[Int](fieldName = "needed")))
Оттуда вы можете продолжить агрегацию, выесть несколько ошибок в вашей логике.
Во-первых, вам не нужен вызов count
.
И, во-вторых, если вам нужно посчитать, сколько раз "needed"
был больше, чем тот, который вы не можетесделайте _ + _
, потому что это сумма необходимых значений.
val grouped: RDD[(Int, Int)] = rdd.reduceByKey { (acc, v) => if (v > 0) acc + 1 else acc }
val result: Array[(Int, Int)] = grouped.collect()
// Array((1,3), (2,0), (3,2))
PS: Вы должны сказать своему профессору перейти на Spark 2 и Scala 2.11;)
Edit
Использование case-классов в приведенном выше примере.
final case class Data(id: Int, needed: Int)
val rdd: RDD[Data] = df.as[Data].rdd
val grouped: RDD[(Int, Int)] = rdd.map(d => d.id -> d.needed).reduceByKey { (acc, v) => if (v > 0) acc + 1 else acc }
val result: Array[(Int, Int)] = grouped.collect()
// Array((1,3), (2,0), (3,2))