Представьте, что у нас есть набор данных clickstream, содержащий миллионы строк. И мы хотим вычислить уникальный идентификатор сеанса на пользователя в минуту. Пример набора данных :
+------+-------------------+
|userId|clicktime |
+------+-------------------+
|1039 |2009-04-21 13:17:50|
|1039 |2009-04-21 13:17:59|
|1039 |2009-04-21 13:19:59|
|1038 |2009-05-21 13:17:50|
|1037 |2009-05-21 13:17:00|
|1037 |2009-05-21 13:17:50|
|1037 |2009-05-21 13:17:59|
|1037 |2009-05-21 13:19:59|
|1038 |2009-05-21 13:19:59|
|1039 |2009-04-21 13:20:50|
+------+-------------------+
Я написал код в Spark-Scala, чтобы решить эту проблемуно это не оптимальное решение для набора данных с миллионами строк. Я хочу лучшее решение, чем то, что я реализовал. Ниже приведен исходный код моей реализации:
val dfWithLag = rawData
.withColumn("lag", lag(col("clicktime"), 1)
.over(Window.partitionBy("userId") orderBy ("clicktime")).cast("timestamp"))
.withColumn("lag_diff", unix_timestamp($"clicktime") - unix_timestamp($"lag"))
.withColumn("lag_diff", when(col("lag_diff").isNull, 0).otherwise(col("lag_diff")))
.orderBy("userId", "clicktime")
val finalDf = dfWithLag.repartition(col("userId")).mapPartitions(partition => {
var sessionId = scala.util.Random
var currentSessionId = sessionId.nextInt().toInt
val newPartition = partition
.map(record => {
ClickStream(record.getInt(0),record.getTimestamp(1), record.getTimestamp(2),
record.getLong(3), {
val timeDiff = record.getLong(3)
if (timeDiff > 60) {
currentSessionId = sessionId.nextInt.toInt
currentSessionId
}
else if (timeDiff == 0) currentSessionId
else currentSessionId
}
)
}).toList
newPartition.iterator
})
(Encoders.product[ClickStream])
rawData.show(false)
finalDf.drop("lag").drop("lagDiff").show(false)
Вывод кода:
+------+-------------------+-----------+
|userId|clickTime |sessionId |
+------+-------------------+-----------+
|1037 |2009-05-21 13:17:00|1049786501 |
|1037 |2009-05-21 13:17:50|1049786501 |
|1037 |2009-05-21 13:17:59|1049786501 |
|1037 |2009-05-21 13:19:59|-1649908351|
|1039 |2009-04-21 13:17:50|-1794290301|
|1039 |2009-04-21 13:17:59|-1794290301|
|1039 |2009-04-21 13:19:59|668855070 |
|1039 |2009-04-21 13:20:50|668855070 |
|1038 |2009-05-21 13:17:50|1149727960 |
|1038 |2009-05-21 13:19:59|-95969967 |
+------+-------------------+-----------+