Невозможно применить AggregateByKey, используя Spark Scala для получения (id, total, (max, min)) для данного ввода (id, value) - PullRequest
0 голосов
/ 14 сентября 2018

Я пробовал этот код, но он говорит, что кортежи не могут быть деструктурированы.

forditems.map(stre=>(stre.split(",")(1).toInt, stre.split(",")(4).toFloat)).aggregateByKey((0.0f, (0.0f, 0.0f)))(
(t,v) => (t._1 + v, ( if(v > t._2._1) v else  t._2._1 , if(v > t._2._2) v else  t._2._2 ))
(x,t) => (x._1 + t._1, (if(x._2._1 < t._2._1) t._2._1 else  x._2._1, if(x._2_.2 < t._2._2) t._2._2 else  x._2._2 )))

сообщение об ошибке

<console>:7: error: not a legal formal parameter.
Note: Tuples cannot be directly destructured in method or function parameters.
  Either create a single parameter accepting the Tuple1,
  or consider a pattern matching anonymous function: `{ case (param1, param1) => ... }
 (x,t) => (x._1 + t._1, (if(x._2._1 < t._2._1) t._2._1 else  x._2._1, 
 if(x._2_.2 < t._2._2) t._2._2 else  x._2._2 ))).filter(stre=> 
 stre._1==2).take(10).foreach(println)
 ^
 <console>:7: error: ')' expected but double literal found.
 (x,t) => (x._1 + t._1, (if(x._2._1 < t._2._1) t._2._1 else  x._2._1, 
 if(x._2_.2 < t._2._2) t._2._2 else  x._2._2 ))).filter(stre=> 
 stre._1==2).take(10).foreach(println)

1 Ответ

0 голосов
/ 14 сентября 2018

Здесь на самом деле есть только небольшие синтаксические ошибки:

  • Отсутствует запятая между первым и вторым аргументами, передаваемыми в aggregateByKey (конец второй строки)
  • Вы ввели x._2_.2 вместо x._2._2

Исправление этих данных даст желаемый результат.

НО - Тем не менее, стоит отметить, что Scala предлагает гораздо более приятный синтаксическийварианты работы с кортежами и выполнения простых арифметических операций.Что еще более важно, кортежи не следует использовать слишком часто, и распространенной альтернативой является создание класса case , который лучше поддерживает операции, которые вы пытаетесь выполнить.

Например, мыможет создать простой Stats класс case, который имеет метод agg:

case class Stats(total: Float, min: Float, max: Float) {
  def agg(other: Stats): Stats = Stats(
    total + other.total,
    math.min(min, other.min),
    math.max(max, other.max)
  )
}

и затем использовать reduceByKey с этой функцией:

val result: RDD[(Int, Stats)] = forditems
  .map(_.split(",")).map(arr => (arr(1).toInt, arr(4).toFloat))
  .mapValues(f => Stats(f, f, f))
  .reduceByKey(_ agg _)

Более того, если вы 'готов использовать Spark DateFrames - это становится еще проще:

import org.apache.spark.sql.functions._
import spark.implicits._

val df = forditems.map(_.split(",")).map(arr => (arr(1).toInt, arr(4).toFloat)).toDF("k", "v")

val resultDf = df.groupBy("k").agg(sum($"v"), min($"v"), max($"v"))
resultDf.show()

// +---+------+------+------+
// |  k|sum(v)|min(v)|max(v)|
// +---+------+------+------+
// |  1| 444.0|   4.0| 400.0|
// +---+------+------+------+
...