Объединение записей RDD для получения одной строки с несколькими условными счетчиками - PullRequest
0 голосов
/ 06 мая 2019

В качестве небольшого контекста я пытаюсь достичь нескольких строк, сгруппированных по определенному набору ключей, после этого первого сокращения я бы хотел сгруппировать их в общую строку, например, по дате. с каждым из сгруппированных счетчиков, предварительно рассчитанных. Это может показаться непонятным, просто прочитав его, вот пример вывода (довольно простой, ничего сложного) того, что должно произойти.

(("Volvo", "T4", "2019-05-01"), 5)
(("Volvo", "T5", "2019-05-01"), 7)
(("Audi", "RS6", "2019-05-01"), 4)

И однажды объединили эти объекты Роу ...

date         , volvo_counter     , audi_counter
"2019-05-01" , 12                , 4

Я считаю, что это довольно сложный случай, и что могут быть разные подходы, но мне было интересно, было ли какое-либо решение в одном и том же СДР, поэтому нет необходимости в нескольких СДР, разделенных на счетчик.

Ответы [ 2 ]

2 голосов
/ 06 мая 2019

То, что вы хотите сделать, это разворот. Вы говорите о RDD, поэтому я предполагаю, что ваш вопрос: «как сделать сводку с помощью RDD API?». Насколько я знаю, в RDD API нет встроенной функции, которая бы это делала. Вы можете сделать это самостоятельно так:

// let's create sample data
val rdd = sc.parallelize(Seq(
  (("Volvo", "T4", "2019-05-01"), 5),
  (("Volvo", "T5", "2019-05-01"), 7),
  (("Audi", "RS6", "2019-05-01"), 4)
))

// If the keys are not known in advance, we compute their distinct values
val values = rdd.map(_._1._1).distinct.collect.toSeq
// values: Seq[String] = WrappedArray(Volvo, Audi)

// Finally we make the pivot and use reduceByKey on the sequence
val res = rdd
    .map{ case ((make, model, date), counter) =>
        date -> values.map(v => if(make == v) counter else 0)
    }
    .reduceByKey((a, b) => a.indices.map(i => a(i) + b(i)))

// which gives you this
res.collect.head
// (String, Seq[Int]) = (2019-05-01,Vector(12, 4))

Обратите внимание, что вы можете написать гораздо более простой код с помощью SparkSQL API:

// let's first transform the previously created RDD to a dataframe:
val df = rdd.map{ case ((a, b, c), d) => (a, b, c, d) }
    .toDF("make", "model", "date", "counter")

// And then it's as simple as that:
df.groupBy("date")
  .pivot("make")
  .agg(sum("counter"))
  .show

+----------+----+-----+
|      date|Audi|Volvo|
+----------+----+-----+
|2019-05-01|   4|   12|
+----------+----+-----+
1 голос
/ 06 мая 2019

Я думаю, что проще сделать с DataFrame:

   val data = Seq(
      Record(Key("Volvo", "2019-05-01"), 5),
      Record(Key("Volvo", "2019-05-01"), 7),
      Record(Key("Audi", "2019-05-01"), 4)
    )

    val rdd = spark.sparkContext.parallelize(data)

    val df = rdd.toDF()

    val modelsExpr = df
      .select("key.model").as("model")
      .distinct()
      .collect()
      .map(r => r.getAs[String]("model"))
      .map(m => sum(when($"key.model" === m, $"value").otherwise(0)).as(s"${m}_counter"))

    df
      .groupBy("key.date")
      .agg(modelsExpr.head, modelsExpr.tail: _*)
      .show(false)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...