Spark: пользовательская функция окна - PullRequest
0 голосов
/ 19 июня 2020

У меня есть таблица заказов. Для каждого заказа есть разные продукты, которые представлены в разном количестве. Пример:

+---------------+-------------+--------+
|order_id       |   product_id|quantity|
+---------------+-------------+--------+
|A              |X            |       5|
|A              |Y            |       1|
|A              |Z            |       3|

Затем покупатель может решить вернуть продукт, и это будет отмечено как отрицательное количество в том же порядке:

+---------------+-------------+--------+
|order_id       |   product_id|quantity|
+---------------+-------------+--------+
|A              |X            |       5|
|A              |Y            |       1|
|A              |Z            |       3|
|A              |X            |      -1|
|A              |Z            |      -1|

Мне нужно создать новый столбец с именем position_number, который присваивает порядковый номер записям с положительным количеством, принадлежащим одному и тому же порядку (и это легко, я просто использую функцию row_number). Затем сложная часть: мне нужно присвоить position_number записям с отрицательными количествами, суммируя 1000 с position_number соответствующего продукта того же порядка. Конечный результат должен быть:

+---------------+---------------+-------------+--------+
|position_number|order_id       |   product_id|quantity|
+---------------+---------------+-------------+--------+
|              1|A              |X            |       5|
|              2|A              |Y            |       1|
|              3|A              |Z            |       3|
|           1001|A              |X            |      -1|
|           1003|A              |Z            |      -1|

Как это сделать? Приветствуется любое решение со Spark (python, scala, SQL ..)

1 Ответ

0 голосов
/ 19 июня 2020

Проверьте, помогает ли это -

  df.withColumn("is_negative", $"quantity" < 0)
      .withColumn("position_number", row_number()
      .over(Window.partitionBy($"order_id", $"is_negative").orderBy("product_id")))
      .withColumn("position_number",
        when($"is_negative", max(expr("if(is_negative, 0, position_number)"))
          .over(Window.partitionBy("order_id", "product_id")) + 1000)
          .otherwise($"position_number")
      )
      .show(false)

    /**
      * +--------+----------+--------+-----------+---------------+
      * |order_id|product_id|quantity|is_negative|position_number|
      * +--------+----------+--------+-----------+---------------+
      * |A       |Y         |1       |false      |2              |
      * |A       |Z         |3       |false      |3              |
      * |A       |Z         |-1      |true       |1003           |
      * |A       |X         |5       |false      |1              |
      * |A       |X         |-1      |true       |1001           |
      * +--------+----------+--------+-----------+---------------+
      */
...