Условная конкатенация в Spark - PullRequest
0 голосов
/ 22 апреля 2020

У меня есть фрейм данных со следующей структурой:

+----------+------+------+----------------+--------+------+
|      date|market|metric|aggregator_Value|type    |rank  |
+----------+------+------+----------------+--------+------+
|2018-08-05|    m1|   16 |              m1|median  |  1   |
|2018-08-03|    m1|    5 |              m1|median  |  2   |
|2018-08-01|    m1|   10 |              m1|mean    |  3   |
|2018-08-05|    m2|   35 |              m2|mean    |  1   |
|2018-08-03|    m2|   25 |              m2|mean    |  2   |
|2018-08-01|    m2|    5 |              m2|mean    |  3   |
+----------+------+------+----------------+--------+------+

В этом фрейме данных ранг столбца рассчитывается по порядку даты и группировки столбца рынка. Подобно этому

val w_rank = Window.partitionBy("market").orderBy(desc("date"))
val outputDF2=outputDF1.withColumn("rank",rank().over(w_rank))

Я хочу извлечь объединенное значение столбца metri c в выходном фрейме данных, когда rank = 1, с условием, что если type = "median" в rank = 1 Затем строка объединяет все значения metri c с этим рынком. В противном случае, если type = "mean" в строке rank = 1, тогда объединяются только предыдущие 2 значения metri c. Как это

+----------+------+------+----------------+--------+---------+
|      date|market|metric|aggregator_Value|type    |result   |
+----------+------+------+----------------+--------+---------+
|2018-08-05|    m1|   16 |              m1|median  |10|5|16  |
|2018-08-05|    m2|   35 |              m1|mean    |25|35    |
+----------+------+------+----------------+--------+---------+    

Как мне этого добиться?

1 Ответ

0 голосов
/ 22 апреля 2020

Вы можете обнулить столбец metric в соответствии с заданным условием c и применить collect_list, а затем concat_ws, чтобы получить требуемый результат, как показано ниже:

val df = Seq(
  ("2018-08-05", "m1", 16, "m1", "median", 1),
  ("2018-08-03", "m1",  5, "m1", "median", 2),
  ("2018-08-01", "m1", 10, "m1", "mean",   3),
  ("2018-08-05", "m2", 35, "m2", "mean",   1),
  ("2018-08-03", "m2", 25, "m2", "mean",   2),
  ("2018-08-01", "m2",  5, "m2", "mean",   3)
).toDF("date", "market", "metric", "aggregator_value", "type", "rank")

val win_desc = Window.partitionBy("market").orderBy(desc("date"))
val win_asc = Window.partitionBy("market").orderBy(asc("date"))

df.
  withColumn("rank1_type", first($"type").over(win_desc.rowsBetween(Window.unboundedPreceding, 0))).
  withColumn("cond_metric", when($"rank1_type" === "mean" && $"rank" > 2, null).otherwise($"metric")).
  withColumn("result", concat_ws("|", collect_list("cond_metric").over(win_asc))).
  where($"rank" === 1).
  show
// +----------+------+------+----------------+------+----+----------+-----------+-------+
// |      date|market|metric|aggregator_value|  type|rank|rank1_type|cond_metric| result|
// +----------+------+------+----------------+------+----+----------+-----------+-------+
// |2018-08-05|    m1|    16|              m1|median|   1|    median|         16|10|5|16|
// |2018-08-05|    m2|    35|              m2|  mean|   1|      mean|         35|  25|35|
// +----------+------+------+----------------+------+----+----------+-----------+-------+
...