Как рассчитать уникальный идентификатор сеанса на пользователя в минуту в наборе данных потока кликов с помощью Spark-SQL? - PullRequest
2 голосов
/ 24 октября 2019

Представьте, что у нас есть набор данных clickstream, содержащий миллионы строк. И мы хотим вычислить уникальный идентификатор сеанса на пользователя в минуту. Пример набора данных :

+------+-------------------+
|userId|clicktime          |
+------+-------------------+
|1039  |2009-04-21 13:17:50|
|1039  |2009-04-21 13:17:59|
|1039  |2009-04-21 13:19:59|
|1038  |2009-05-21 13:17:50|
|1037  |2009-05-21 13:17:00|
|1037  |2009-05-21 13:17:50|
|1037  |2009-05-21 13:17:59|
|1037  |2009-05-21 13:19:59|
|1038  |2009-05-21 13:19:59|
|1039  |2009-04-21 13:20:50|
+------+-------------------+

Я написал код в Spark-Scala, чтобы решить эту проблемуно это не оптимальное решение для набора данных с миллионами строк. Я хочу лучшее решение, чем то, что я реализовал. Ниже приведен исходный код моей реализации:

val dfWithLag = rawData
      .withColumn("lag", lag(col("clicktime"), 1)
        .over(Window.partitionBy("userId") orderBy ("clicktime")).cast("timestamp"))
      .withColumn("lag_diff", unix_timestamp($"clicktime") - unix_timestamp($"lag"))
      .withColumn("lag_diff", when(col("lag_diff").isNull, 0).otherwise(col("lag_diff")))
      .orderBy("userId", "clicktime")


    val finalDf = dfWithLag.repartition(col("userId")).mapPartitions(partition => {
      var sessionId = scala.util.Random
      var currentSessionId = sessionId.nextInt().toInt
      val newPartition = partition
        .map(record => {
          ClickStream(record.getInt(0),record.getTimestamp(1), record.getTimestamp(2),
            record.getLong(3), {
              val timeDiff = record.getLong(3)
              if (timeDiff > 60) {
                currentSessionId = sessionId.nextInt.toInt
                currentSessionId
              }
              else if (timeDiff == 0) currentSessionId
              else currentSessionId
            }
          )
        }).toList
      newPartition.iterator
    })
    (Encoders.product[ClickStream])

    rawData.show(false)
    finalDf.drop("lag").drop("lagDiff").show(false)

Вывод кода:

+------+-------------------+-----------+
|userId|clickTime          |sessionId  |
+------+-------------------+-----------+
|1037  |2009-05-21 13:17:00|1049786501 |
|1037  |2009-05-21 13:17:50|1049786501 |
|1037  |2009-05-21 13:17:59|1049786501 |
|1037  |2009-05-21 13:19:59|-1649908351|
|1039  |2009-04-21 13:17:50|-1794290301|
|1039  |2009-04-21 13:17:59|-1794290301|
|1039  |2009-04-21 13:19:59|668855070  |
|1039  |2009-04-21 13:20:50|668855070  |
|1038  |2009-05-21 13:17:50|1149727960 |
|1038  |2009-05-21 13:19:59|-95969967  |
+------+-------------------+-----------+

1 Ответ

0 голосов
/ 24 октября 2019

Вы можете просто получить дату без секунд, используя date_format, а затем создать свой уникальный sessionId с помощью функции hash. В вашем примере вы используете scala.util.Random без начального числа, и из-за этого ваш идентификатор сессии может быть не уникальным.

df.withColumn("sessionId", hash('userId, date_format('clicktime,"yyyy-MM-dd HH:mm"))).show()
    +------+-------------------+
    |userId|          clicktime|
    +------+-------------------+
    |  1039|2009-04-21 13:17:50|
    |  1039|2009-04-21 13:17:59|
    |  1039|2009-04-21 13:19:59|
    |  1038|2009-05-21 13:17:50|
    |  1037|2009-05-21 13:17:00|
    |  1037|2009-05-21 13:17:50|
    |  1037|2009-05-21 13:17:59|
    |  1037|2009-05-21 13:19:59|
    |  1038|2009-05-21 13:19:59|
    |  1039|2009-04-21 13:20:50|
    +------+-------------------+

    +------+-------------------+-----------+
    |userId|          clicktime|  sessionId|
    +------+-------------------+-----------+
    |  1039|2009-04-21 13:17:50|-1768577078|
    |  1039|2009-04-21 13:17:59|-1768577078|
    |  1039|2009-04-21 13:19:59| -443001140|
    |  1038|2009-05-21 13:17:50| 1660590339|
    |  1037|2009-05-21 13:17:00| 1360561347|
    |  1037|2009-05-21 13:17:50| 1360561347|
    |  1037|2009-05-21 13:17:59| 1360561347|
    |  1037|2009-05-21 13:19:59|  925508976|
    |  1038|2009-05-21 13:19:59| 1148270137|
    |  1039|2009-04-21 13:20:50| 1342597130|
    +------+-------------------+-----------+


...