Как разделить RDD на разные RDD на основе значения и передать каждую часть функции - PullRequest
0 голосов
/ 06 января 2020

У меня есть RDD, в котором каждый элемент является классом дела, например: case class Element(target: Boolean, data: String) Теперь мне нужно разделить RDD на основе данных String (это дискретная переменная). А затем выполняйте функцию def f(elements: RDD[Element]): Double при каждом разделении.

Я попытался создать паруRDD следующим образом: val test = elementsRDD.map(E => (E.data, E)), поэтому у меня есть пары (ключ, значение), но я не знаю, что делать после это (как разделить их, потому что groupBy возвращает Iteravle (V), а не СДР всех значений).

Я также мог бы отфильтровать каждое возможное значение data: String и выполнить функцию f по результатам. Но я не знаю всех возможных значений, которые «data: String» может заранее принять. И кажется неэффективным сначала go просмотреть все данные, чтобы проверить различные возможности, а затем также несколько раз отфильтровать их.

Так есть ли способ сделать это эффективно?

Ответы [ 2 ]

1 голос
/ 06 января 2020

Все, что вам действительно нужно сделать, это подсчитать путем агрегирования по data, в зависимости от 2 значений, которые может принимать логическое значение. Остальное - простое вычисление, которое зависит только от этих двух значений.

val rdd = sc.parallelize(
  Seq(Element(true,"a"),Element(false,"a"),Element(true,"a"),
    Element(false,"b"),Element(false,"b"),Element(true,"b")))

val log2 = math.log(2)

// calculate an RDD[(String, (Int, Int))], first element of the tuple is the number of "true"s, and the second the number of "false"s
val entropy = rdd.map(e => (e.data, e.target)).aggregateByKey((0, 0))({
  case ((t, f), target) => if (target) (t + 1, f) else (t, f + 1)
}, {
  case ((t1, f1), (t2, f2)) => (t1 + t2, f1 + f2)
}).mapValues {
  case (t, f) =>
    val total = (t + f).toDouble
    val trueRatio = t.toDouble / total
    val falseRatio = f.toDouble / total
    -trueRatio * math.log(trueRatio) / log2 + falseRatio * math.log(falseRatio) / log2
}

// entropy is an RDD[(String, Double)]
entropy foreach println
// (a,-0.1383458330929479)
// (b,0.1383458330929479)
0 голосов
/ 14 января 2020

Ответ с использованием DataFrame:

import spark.implicits._

val rdd = sc.parallelize(
      Seq(Element(true,"a"),Element(false,"a"),Element(true,"a"),
        Element(false,"b"),Element(false,"b"),Element(true,"b")))

val log2 = math.log(2)

var df = rdd.toDF()

val groupedData = df.groupBy($"data")
  .agg(count(when($"target" === true, 1)).alias("true"),count(when($"target" === false, 1)).alias("false"))
  .withColumn("total", $"true" + $"false")
  .withColumn("true ratio", $"true" / $"total")
  .withColumn("false ratio", $"false" / $"total")
  .withColumn("entropy", -$"true ratio" * log($"true ratio") / log2 + $"false ratio" * log($"false ratio") / log2)
  .show()

Вывод:

+----+----+-----+-----+------------------+------------------+-------------------+
|data|true|false|total|        true ratio|       false ratio|            entropy|
+----+----+-----+-----+------------------+------------------+-------------------+
|   b|   1|    2|    3|0.3333333333333333|0.6666666666666666| 0.1383458330929479|
|   a|   2|    1|    3|0.6666666666666666|0.3333333333333333|-0.1383458330929479|
+----+----+-----+-----+------------------+------------------+-------------------+
...