Очень хороший вопрос !!!Пара замечаний, используя rangeBetween , создает фиксированный фрейм, основанный на количестве строк в нем, а не на значениях, поэтому это будет проблематично в 2 случаях:
- customerне имеет заказов каждый божий день, поэтому в окне 365 строк могут быть строки с order_date задолго до того, как год назад
- , если у клиента более одного заказа в день, он испортит годовой охват
- комбинация 1 и 2
Также rangeBetween не работает с типами данных Date и Timestamp.
Для решения этой проблемы можно использовать окноФункция со списками и UDF:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df = spark.sparkContext.parallelize(Seq(
(1, "2017-01-01", "2017-01-10", "A")
, (2, "2017-02-01", "2017-02-10", "A")
, (3, "2017-02-02", "2017-02-20", "A")
)
).toDF("order_id", "order_date", "payment_date", "customer_id")
.withColumn("order_date_ts", to_timestamp($"order_date", "yyyy-MM-dd").cast("long"))
.withColumn("payment_date_ts", to_timestamp($"payment_date", "yyyy-MM-dd").cast("long"))
// df.printSchema()
// df.show(false)
val window = Window.partitionBy("customer_id").orderBy("order_date_ts").rangeBetween(Window.unboundedPreceding, -1)
val count_filtered_dates = udf( (days: Int, top: Long, array: Seq[Long]) => {
val bottom = top - (days * 60 * 60 * 24).toLong // in spark timestamps are in secconds, calculating the date days ago
array.count(v => v >= bottom && v < top)
}
)
val res = df.withColumn("paid_orders", collect_list("payment_date_ts") over window)
.withColumn("paid_order_count", count_filtered_dates(lit(365), $"order_date_ts", $"paid_orders"))
res.show(false)
Вывод:
+--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+
|order_id|order_date|payment_date|customer_id|order_date_ts|payment_date_ts|paid_orders |paid_order_count|
+--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+
|1 |2017-01-01|2017-01-10 |A |1483228800 |1484006400 |[] |0 |
|2 |2017-02-01|2017-02-10 |A |1485907200 |1486684800 |[1484006400] |1 |
|3 |2017-02-02|2017-02-20 |A |1485993600 |1487548800 |[1484006400, 1486684800]|1 |
+--------+----------+------------+-----------+-------------+---------------+------------------------+----------------+
Преобразование дат в метки времени Spark в секундах повышает эффективность использования списков.
ЭтоСамый простой для реализации код, но не самый оптимальный, так как списки занимают немного памяти, лучше подойдет пользовательский UDAF, но требует больше кодирования, что может сделать позже.Это будет работать, если у вас есть тысячи заказов на одного клиента.