Функция Spark Window, использующая более одного столбца - PullRequest
1 голос
/ 09 марта 2019

У меня есть этот фрейм данных, который показывает время отправки и время открытия для каждого пользователя:

val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
             ("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
             ("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
             ("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
             ("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
             ("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
             ("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
             ("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),             
             ("user2", "2018-04-05 18:00:00", null),
             ("user2", "2018-04-05 19:00:00", null)              
            ).toDF("id", "sendTime", "openTime")

+-----+-------------------+-------------------+
|   id|           sendTime|           openTime|
+-----+-------------------+-------------------+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|
|user2|2018-04-05 18:00:00|               null|
|user2|2018-04-05 19:00:00|               null|
+-----+-------------------+-------------------+

Теперь я хочу посчитать количество открытий, которые произошли за последние два часа, сКаждое время отправки для каждого пользователя.Я использовал оконную функцию для разделения по пользователю, но я не мог понять, как сравнивать значения из столбца sendTime со столбцом openTime.Результирующий фрейм данных должен выглядеть следующим образом:

+-----+-------------------+-------------------+-----+
|   id|           sendTime|           openTime|count|
+-----+-------------------+-------------------+-----+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|    0|
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|    1|
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|    2|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|    2|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|    0|
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|    1|
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|    2|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|    2|
|user2|2018-04-05 18:00:00|               null|    3|
|user2|2018-04-05 19:00:00|               null|    2|
+-----+-------------------+-------------------+-----+

Это так далеко, как я получил, но не дает мне то, что мне нужно:

var df2 = df.withColumn("sendUnix", F.unix_timestamp($"sendTime")).withColumn("openUnix", F.unix_timestamp($"openTime"))
val w = Window.partitionBy($"id").orderBy($"sendUnix").rangeBetween(-2*60*60, 0)
df2 = df2.withColumn("count", F.count($"openUnix").over(w))

Ответы [ 2 ]

2 голосов
/ 09 марта 2019

Это кажется довольно трудным, если вы просто используете функции Window, потому что вы не можете ссылаться на верхний предел sendTime при попытке определить, находится ли значение из openTime в течение последних 2 часов от верхнего предела sendTime.

С spark 2.4 появились функции более высокого порядка, о которых вы можете прочитать здесь (https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html). Используя их, вы можете собрать все openTime в пределах окна, используя функцию collect_list, а затем использовать функцию более высокого порядка filter отфильтруйте openTimes за два часа до sendTime. Наконец, вы можете подсчитать значения, оставшиеся в списке, чтобы подсчитать, чего вы хотите. Вот мой код для этого.

import org.apache.spark.sql.expressions.Window

val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
             ("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
             ("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
             ("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
             ("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
             ("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
             ("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
             ("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),             
             ("user2", "2018-04-05 18:00:00", null),
             ("user2", "2018-04-05 19:00:00", null)              
            ).toDF("id", "sendTime", "openTime")

var df2 = df.withColumn("sendUnix", unix_timestamp($"sendTime"))
            .withColumn("openUnix", unix_timestamp($"openTime"))

val df3 = df2.withColumn("opened", collect_list($"openUnix").over(w))

df3.show(false)

+-----+-------------------+-------------------+----------+----------+------------------------------------+
|id   |sendTime           |openTime           |sendUnix  |openUnix  |opened                              |
+-----+-------------------+-------------------+----------+----------+------------------------------------+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800]                        |
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800]            |
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|[1522950600, 1522947000, 1522943400]|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|[1522939800]                        |
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|[1522943400, 1522939800]            |
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|[1522947000, 1522943400, 1522939800]|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|[1522946400, 1522947000, 1522943400]|
|user2|2018-04-05 18:00:00|null               |1522947600|null      |[1522946400, 1522947000, 1522943400]|
|user2|2018-04-05 19:00:00|null               |1522951200|null      |[1522946400, 1522947000]            |
+-----+-------------------+-------------------+----------+----------+------------------------------------+

val df4 = df3.selectExpr("id", "sendTime", "openTime", "sendUnix", "openUnix",
        "size(filter(opened, x -> x < sendUnix AND  x > sendUnix - 7200)) as count")

df4.show(false)

+-----+-------------------+-------------------+----------+----------+-----+
|id   |sendTime           |openTime           |sendUnix  |openUnix  |count|
+-----+-------------------+-------------------+----------+----------+-----+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0    |
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1    |
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2    |
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|1522947600|1522950600|2    |
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|1522936800|1522939800|0    |
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|1522940400|1522943400|1    |
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|1522944000|1522947000|2    |
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|1522945800|1522946400|1    |
|user2|2018-04-05 18:00:00|null               |1522947600|null      |3    |
|user2|2018-04-05 19:00:00|null               |1522951200|null      |2    |
+-----+-------------------+-------------------+----------+----------+-----+
1 голос
/ 10 марта 2019

Вот, пожалуйста. Код, решающий проблему

val df1 = df.withColumn("sendTimeStamp", unix_timestamp(col("sendTime"))).withColumn("openTimeStamp", unix_timestamp(col("openTime")))

    val w = Window.partitionBy('id).orderBy('sendTimeStamp).rangeBetween(-7200, 0)

    var df2 = df1.withColumn("list", collect_list('openTimeStamp).over(w))

    var df3 = df2.select('*, explode('list).as("prevTimeStamp"))

    df3.groupBy('id, 'sendTime).agg(max('openTime).as("openTime"), sum(when(col("sendTimeStamp").minus(col("prevTimeStamp")).between(0, 7200), 1).otherwise(0)).as("count")).show

Пожалуйста, примите ответ, если он решит.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...