Как сложить (случай, когда тогда) в Spark SQL DataFrame так же, как sql? - PullRequest
0 голосов
/ 10 марта 2020

Я новичок в Spark SQL, и я хочу рассчитать процент в моих данных с каждым статусом. Вот мои данные, как показано ниже:

A   B
11  1
11  3
12  1
13  3
12  2
13  1
11  1
12  2

Итак, я могу сделать это в SQL следующим образом:

select (C.oneTotal / C.total)   as onePercentage,
       (C.twoTotal / C.total)   as twotPercentage,
       (C.threeTotal / C.total) as threPercentage
from (select count(*) as total,
             A,
             sum(case when B = '1' then 1 else 0 end) as oneTotal,
             sum(case when B = '2' then 1 else 0 end) as twoTotal,
             sum(case when B = '3' then 1 else 0 end) as threeTotal
      from test
      group by A) as C;

Но в Spark SQL DataFrame сначала я вычисляю totalCount в каждом статусе, как показано ниже:

// wrong code
val cc = transDF.select("transData.*").groupBy("A")
      .agg(count("transData.*").alias("total"),
        sum(when(col("B") === "1", 1)).otherwise(0)).alias("oneTotal")
        sum(when(col("B") === "2", 1).otherwise(0)).alias("twoTotal")

Проблема в том, что сумма (когда) результат равен нулю. enter image description here

Я неправильно использую его? Как реализовать это в Spark SQL так же, как мой выше SQL? А потом подсчитать долю каждого статуса?

Спасибо за помощь. В конце концов, я решаю это с суммой (когда). ниже мой текущий код.

val cc = transDF.select("transData.*").groupBy("A")
      .agg(count("transData.*").alias("total"),
        sum(when(col("B") === "1", 1).otherwise(0)).alias("oneTotal"),
        sum(when(col("B") === "2", 1).otherwise(0)).alias("twoTotal"))
      .select(col("total"),
        col("A"),
        col("oneTotal") / col("total").alias("oneRate"),
        col("twoTotal") / col("total").alias("twoRate"))

Еще раз спасибо.

Ответы [ 2 ]

1 голос
/ 10 марта 2020

вы можете использовать sum(when(... или также count(when.., второй вариант короче, чтобы написать:

val df = Seq(
  (11, 1),
  (11, 3),
  (12, 1),
  (13, 3),
  (12, 2),
  (13, 1),
  (11, 1),
  (12, 2)
).toDF("A", "B")

df
  .groupBy($"A")
  .agg(
    count("*").as("total"),
    count(when($"B"==="1",$"A")).as("oneTotal"),
    count(when($"B"==="2",$"A")).as("twoTotal"),
    count(when($"B"==="3",$"A")).as("threeTotal")
  )
  .select(
    $"A",
    ($"oneTotal"/$"total").as("onePercentage"),
    ($"twoTotal"/$"total").as("twoPercentage"),
    ($"threeTotal"/$"total").as("threePercentage")
  )
  .show()

дает

+---+------------------+------------------+------------------+
|  A|     onePercentage|     twoPercentage|   threePercentage|
+---+------------------+------------------+------------------+
| 12|0.3333333333333333|0.6666666666666666|               0.0|
| 13|               0.5|               0.0|               0.5|
| 11|0.6666666666666666|               0.0|0.3333333333333333|
+---+------------------+------------------+------------------+

в качестве альтернативы, вы можете получить "длинный Таблица с оконными функциями:

df
  .groupBy($"A",$"B").count()
  .withColumn("total",sum($"count").over(Window.partitionBy($"A")))
  .select(
    $"A",
    $"B",
    ($"count"/$"total").as("percentage")
  ).orderBy($"A",$"B")
  .show()

+---+---+------------------+
|  A|  B|        percentage|
+---+---+------------------+
| 11|  1|0.6666666666666666|
| 11|  3|0.3333333333333333|
| 12|  1|0.3333333333333333|
| 12|  2|0.6666666666666666|
| 13|  1|               0.5|
| 13|  3|               0.5|
+---+---+------------------+
1 голос
/ 10 марта 2020

Насколько я понял, вы хотите реализовать логику c, как показано выше sql, показанную в вопросе.

один из способов подобен приведенному ниже примеру

package examples

import org.apache.log4j.Level
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object AggTest extends App {
  val logger = org.apache.log4j.Logger.getLogger("org")
  logger.setLevel(Level.WARN)


  val spark = SparkSession.builder.appName(getClass.getName)
    .master("local[*]").getOrCreate

  import spark.implicits._

  val df = Seq(
    (11, 1),
    (11, 3),
    (12, 1),
    (13, 3),
    (12, 2),
    (13, 1),
    (11, 1),
    (12, 2)
  ).toDF("A", "B")

  df.show(false)
  df.createOrReplaceTempView("test")
  spark.sql(
    """
      |select (C.oneTotal / C.total)   as onePercentage,
      |       (C.twoTotal / C.total)   as twotPercentage,
      |       (C.threeTotal / C.total) as threPercentage
      |from (select count(*) as total,
      |             A,
      |             sum(case when B = '1' then 1 else 0 end) as oneTotal,
      |             sum(case when B = '2' then 1 else 0 end) as twoTotal,
      |             sum(case when B = '3' then 1 else 0 end) as threeTotal
      |      from test
      |      group by A) as C
    """.stripMargin).show


}

Результат:

+---+---+
|A  |B  |
+---+---+
|11 |1  |
|11 |3  |
|12 |1  |
|13 |3  |
|12 |2  |
|13 |1  |
|11 |1  |
|12 |2  |
+---+---+

+------------------+------------------+------------------+
|     onePercentage|    twotPercentage|    threPercentage|
+------------------+------------------+------------------+
|0.3333333333333333|0.6666666666666666|               0.0|
|               0.5|               0.0|               0.5|
|0.6666666666666666|               0.0|0.3333333333333333|
+------------------+------------------+------------------+
...