Функция Spark Window с условием в текущей строке - PullRequest
0 голосов
/ 19 октября 2018

Я пытаюсь подсчитать для данного order_id, сколько заказов было за последние 365 дней, которые имели платеж.И это не проблема: я использую оконную функцию .

. Для меня это становится сложнее: я не хочу считать заказы в этом временном окне, где payment_dateэто после order_date текущего order_id.

В настоящее время у меня есть что-то вроде этого:

val window: WindowSpec = Window
  .partitionBy("customer_id")
  .orderBy("order_date")
  .rangeBetween(-365*days, -1)

и

df.withColumn("paid_order_count", count("*") over window)

, что будет считать все заказыдля клиента в течение последних 365 дней до его текущего заказа.

Как теперь включить условие для подсчета, которое учитывает order_date текущего заказа?

Пример:

+---------+-----------+-------------+------------+
|order_id |order_date |payment_date |customer_id |
+---------+-----------+-------------+------------+
|1        |2017-01-01 |2017-01-10   |A           |
|2        |2017-02-01 |2017-02-10   |A           |
|3        |2017-02-02 |2017-02-20   |A           |

Полученная таблица должнавыглядеть следующим образом:

+---------+-----------+-------------+------------+-----------------+
|order_id |order_date |payment_date |customer_id |paid_order_count |
+---------+-----------+-------------+------------+-----------------+
|1        |2017-01-01 |2017-01-10   |A           |0                |
|2        |2017-02-01 |2017-02-10   |A           |1                |
|3        |2017-02-02 |2017-02-20   |A           |1                |

Для order_id = 3 paid_order_count должно быть не 2, а 1, поскольку order_id = 2 выплачивается после размещения order_id = 3.

Я надеюсь, что я хорошо объяснил свою проблему и с нетерпением жду ваших идей!

1 Ответ

0 голосов
/ 20 октября 2018

Очень хороший вопрос !!!Пара замечаний, используя rangeBetween , создает фиксированный фрейм, основанный на количестве строк в нем, а не на значениях, поэтому это будет проблематично в 2 случаях:

  1. customerне имеет заказов каждый божий день, поэтому в окне 365 строк могут быть строки с order_date задолго до того, как год назад
  2. , если у клиента более одного заказа в день, он испортит годовой охват
  3. комбинация 1 и 2

Также rangeBetween не работает с типами данных Date и Timestamp.

Для решения этой проблемы можно использовать окноФункция со списками и UDF:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

  val df = spark.sparkContext.parallelize(Seq(
    (1, "2017-01-01", "2017-01-10", "A")
    , (2, "2017-02-01", "2017-02-10", "A")
    , (3, "2017-02-02", "2017-02-20", "A")
  )
  ).toDF("order_id", "order_date", "payment_date", "customer_id")
    .withColumn("order_date_ts", to_timestamp($"order_date", "yyyy-MM-dd").cast("long"))
    .withColumn("payment_date_ts", to_timestamp($"payment_date", "yyyy-MM-dd").cast("long"))

//      df.printSchema()
//      df.show(false)

  val window = Window.partitionBy("customer_id").orderBy("order_date_ts").rangeBetween(Window.unboundedPreceding, -1)

  val count_filtered_dates = udf( (days: Int, top: Long, array: Seq[Long]) => {
      val bottom = top - (days * 60 * 60 * 24).toLong // in spark timestamps are in secconds, calculating the date days ago
      array.count(v => v >= bottom && v < top)
    }
  )

  val res = df.withColumn("paid_orders", collect_list("payment_date_ts") over window)
      .withColumn("paid_order_count", count_filtered_dates(lit(365), $"order_date_ts", $"paid_orders"))

  res.show(false)

Вывод:

+--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+
|order_id|order_date|payment_date|customer_id|order_date_ts|payment_date_ts|paid_orders             |paid_order_count|
+--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+
|1       |2017-01-01|2017-01-10  |A          |1483228800   |1484006400     |[]                      |0               |
|2       |2017-02-01|2017-02-10  |A          |1485907200   |1486684800     |[1484006400]            |1               |
|3       |2017-02-02|2017-02-20  |A          |1485993600   |1487548800     |[1484006400, 1486684800]|1               |
+--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+

Преобразование дат в метки времени Spark в секундах повышает эффективность использования списков.

ЭтоСамый простой для реализации код, но не самый оптимальный, так как списки занимают немного памяти, лучше подойдет пользовательский UDAF, но требует больше кодирования, что может сделать позже.Это будет работать, если у вас есть тысячи заказов на одного клиента.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...