Как я могу уменьшить карту в группе искровых данных по условным столбцам? - PullRequest
0 голосов
/ 02 апреля 2019

Мой искровой фрейм выглядит так:

+------+------+-------+------+
|userid|useid1|userid2|score |
+------+------+-------+------+
|23    |null  |dsad   |3     |
|11    |44    |null   |4     |
|231   |null  |temp   |5     |
|231   |null  |temp   |2     |
+------+------+-------+------+

Я хочу выполнить расчет для каждой пары userid и useid1 / userid2 (в зависимости от того, что не равно нулю).

И если это useid1, я умножаю счет на 5, если это userid2, я умножаю счет на 3.

Наконец, я хочу добавить все оценки для каждой пары.

Результат должен быть:

+------+--------+-----------+
|userid|useid1/2|final score|
+------+--------+-----------+
|23    |dsad    |9          |
|11    |44      |20         |
|231   |temp    |21         |
+------+------+-------------+

Как я могу это сделать?

Для части groupBy я знаю, что в dataframe есть функция groupBy, но я не знаю, могу ли я использовать ее условно, например, если userid1 равен нулю, groupby(userid, userid2), если userid2 равен нулю, groupby(userid, useid1).

Для части расчета, как умножить 3 или 5 на основе условия?

Ответы [ 3 ]

1 голос
/ 02 апреля 2019

Приведенное ниже решение поможет решить вашу проблему.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

  val groupByUserWinFun = Window.partitionBy("userid","useid1/2")
  val finalScoreDF = userDF.withColumn("useid1/2", when($"userid1".isNull, $"userid2").otherwise($"userid1"))
    .withColumn("finalscore", when($"userid1".isNull, $"score" * 3).otherwise($"score" * 5))
    .withColumn("finalscore", sum("finalscore").over(groupByUserWinFun))
    .select("userid", "useid1/2", "finalscore").distinct()

с использованием метода when в Spark SQL, выберите userid1 или 2 и умножьте на значения, основанные на условии

Вывод:

+------+--------+----------+
|userid|useid1/2|finalscore|
+------+--------+----------+
|   11 |      44|      20.0|
|   23 |    dsad|       9.0|
|   231|    temp|      21.0|
+------+--------+----------+
0 голосов
/ 02 апреля 2019

Группировка по будет работать:

val original = Seq(
  (23, null, "dsad", 3),
  (11, "44", null, 4),
  (231, null, "temp", 5),
  (231, null, "temp", 2)
).toDF("userid", "useid1", "userid2", "score")

// action
val result = original
  .withColumn("useid1/2", coalesce($"useid1", $"userid2"))
  .withColumn("score", $"score" * when($"useid1".isNotNull, 5).otherwise(3))
  .groupBy("userid", "useid1/2")
  .agg(sum("score").alias("final score"))

result.show(false)

Выход:

+------+--------+-----------+
|userid|useid1/2|final score|
+------+--------+-----------+
|23    |dsad    |9          |
|231   |temp    |21         |
|11    |44      |20         |
+------+--------+-----------+
0 голосов
/ 02 апреля 2019

coalesce сделает необходимое.

df.withColumn("userid1/2", coalesce(col("useid1"), col("useid1")))

в основном эта функция возвращает первое ненулевое значение заказа

документация:

COALESCE(T v1, T v2, ...)

Returns the first v that is not NULL, or NULL if all v's are NULL.

нужен импорт import org.apache.spark.sql.functions.coalesce

...