Это кажется довольно трудным, если вы просто используете функции Window
, потому что вы не можете ссылаться на верхний предел sendTime
при попытке определить, находится ли значение из openTime
в течение последних 2 часов от верхнего предела sendTime
.
С spark 2.4 появились функции более высокого порядка, о которых вы можете прочитать здесь (https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html). Используя их, вы можете собрать все openTime
в пределах окна, используя функцию collect_list
, а затем использовать функцию более высокого порядка filter
отфильтруйте openTimes
за два часа до sendTime
. Наконец, вы можете подсчитать значения, оставшиеся в списке, чтобы подсчитать, чего вы хотите. Вот мой код для этого.
import org.apache.spark.sql.expressions.Window
val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),
("user2", "2018-04-05 18:00:00", null),
("user2", "2018-04-05 19:00:00", null)
).toDF("id", "sendTime", "openTime")
var df2 = df.withColumn("sendUnix", unix_timestamp($"sendTime"))
.withColumn("openUnix", unix_timestamp($"openTime"))
val df3 = df2.withColumn("opened", collect_list($"openUnix").over(w))
df3.show(false)
+-----+-------------------+-------------------+----------+----------+------------------------------------+
|id |sendTime |openTime |sendUnix |openUnix |opened |
+-----+-------------------+-------------------+----------+----------+------------------------------------+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800] |
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800] |
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|[1522950600, 1522947000, 1522943400]|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800] |
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800] |
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|[1522946400, 1522947000, 1522943400]|
|user2|2018-04-05 18:00:00|null |1522947600|null |[1522946400, 1522947000, 1522943400]|
|user2|2018-04-05 19:00:00|null |1522951200|null |[1522946400, 1522947000] |
+-----+-------------------+-------------------+----------+----------+------------------------------------+
val df4 = df3.selectExpr("id", "sendTime", "openTime", "sendUnix", "openUnix",
"size(filter(opened, x -> x < sendUnix AND x > sendUnix - 7200)) as count")
df4.show(false)
+-----+-------------------+-------------------+----------+----------+-----+
|id |sendTime |openTime |sendUnix |openUnix |count|
+-----+-------------------+-------------------+----------+----------+-----+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0 |
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1 |
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2 |
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|2 |
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0 |
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1 |
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2 |
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|1 |
|user2|2018-04-05 18:00:00|null |1522947600|null |3 |
|user2|2018-04-05 19:00:00|null |1522951200|null |2 |
+-----+-------------------+-------------------+----------+----------+-----+