Spark Advanced Window с динамическим последним - PullRequest
0 голосов
/ 13 февраля 2019

Проблема: учитывая, что данные временного ряда, которые являются потоком кликов по активности пользователя, хранятся в кусте, попросите обогатить данные идентификатором сеанса с помощью spark.

Определение сеанса

  • Сессия истекает после 1 часа бездействия
  • Сессия остается активной в течение 2 часов

Данные:

click_time,user_id
2018-01-01 11:00:00,u1
2018-01-01 12:10:00,u1
2018-01-01 13:00:00,u1
2018-01-01 13:50:00,u1
2018-01-01 14:40:00,u1
2018-01-01 15:30:00,u1
2018-01-01 16:20:00,u1
2018-01-01 16:50:00,u1
2018-01-01 11:00:00,u2
2018-01-02 11:00:00,u2

Ниже приведено частичное решение, учитывающее только1-й пункт в определении сеанса:

val win1 = Window.partitionBy("user_id").orderBy("click_time")
    val sessionnew = when((unix_timestamp($"click_time") - unix_timestamp(lag($"click_time",1,"2017-01-01 11:00:00.0").over(win1)))/60 >= 60, 1).otherwise(0)
    userActivity
      .withColumn("session_num",sum(sessionnew).over(win1))
      .withColumn("session_id",concat($"user_id", $"session_num"))
      .show(truncate = false)

Фактический вывод:

+---------------------+-------+-----------+----------+
|click_time           |user_id|session_num|session_id|
+---------------------+-------+-----------+----------+
|2018-01-01 11:00:00.0|u1     |1          |u11       |
|2018-01-01 12:10:00.0|u1     |2          |u12       | -- session u12 starts
|2018-01-01 13:00:00.0|u1     |2          |u12       |
|2018-01-01 13:50:00.0|u1     |2          |u12       |
|2018-01-01 14:40:00.0|u1     |2          |u12       | -- this should be a new session as diff of session start of u12 and this row exceeds 2 hours
|2018-01-01 15:30:00.0|u1     |2          |u12       |
|2018-01-01 16:20:00.0|u1     |2          |u12       |
|2018-01-01 16:50:00.0|u1     |2          |u12       | -- now this has to be compared with row 5 to find difference
|2018-01-01 11:00:00.0|u2     |1          |u21       |
|2018-01-02 11:00:00.0|u2     |2          |u22       |
+---------------------+-------+-----------+----------+

Чтобы включить второе условие, я попытался найти разницу между текущим временем и временем начала последнего сеанса, чтобы проверить,это превышает 2 часа, но сама ссылка изменяется для следующих строк.Вот некоторые примеры использования, которые могут быть достигнуты с помощью промежуточной суммы, но здесь это не подходит.

1 Ответ

0 голосов
/ 13 февраля 2019

Не простая задача, которую нужно решить, но вот один из подходов:

  1. Использование Window lag разница меток времени для идентификации сеансов (с 0 = начало сеанса) для пользователя для rule #1
  2. Сгруппировать набор данных, чтобы собрать список различий временных меток для пользователя
  3. Обработать с помощью UDF список различий временных меток, чтобы идентифицировать сеансы для rule #2 и создать все идентификаторы сеансов для каждого пользователя
  4. Разверните сгруппированный набор данных с помощью Spark's explode

Пример кода ниже:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val userActivity = Seq(
  ("2018-01-01 11:00:00", "u1"),
  ("2018-01-01 12:10:00", "u1"),
  ("2018-01-01 13:00:00", "u1"),
  ("2018-01-01 13:50:00", "u1"),
  ("2018-01-01 14:40:00", "u1"),
  ("2018-01-01 15:30:00", "u1"),
  ("2018-01-01 16:20:00", "u1"),
  ("2018-01-01 16:50:00", "u1"),
  ("2018-01-01 11:00:00", "u2"),
  ("2018-01-02 11:00:00", "u2")
).toDF("click_time", "user_id")

def clickSessList(tmo: Long) = udf{ (uid: String, clickList: Seq[String], tsList: Seq[Long]) =>
  def sid(n: Long) = s"$uid-$n"

  val sessList = tsList.foldLeft( (List[String](), 0L, 0L) ){ case ((ls, j, k), i) =>
    if (i == 0) (sid(k + 1) :: ls, 0L, k + 1) else
       if (j + i < tmo) (sid(k) :: ls, j + i, k) else
         (sid(k + 1) :: ls, 0L, k + 1)
  }._1.reverse

  clickList zip sessList
}

Обратите внимание, что аккумулятор для foldLeft в UDF является кортежем (ls, j, k), где:

  • ls - список форматированных идентификаторов сеансов, которые должны быть возвращены
  • j и k предназначены для переноса условно изменяющегося значения метки времени иИдентификационный номер сеанса, соответственно, до следующей итерации

Шаг 1:

val tmo1: Long = 60 * 60
val tmo2: Long = 2 * 60 * 60

val win1 = Window.partitionBy("user_id").orderBy("click_time")

val df1 = userActivity.
  withColumn("ts_diff", unix_timestamp($"click_time") - unix_timestamp(
    lag($"click_time", 1).over(win1))
  ).
  withColumn("ts_diff", when(row_number.over(win1) === 1 || $"ts_diff" >= tmo1, 0L).
    otherwise($"ts_diff")
  )

df1.show
// +-------------------+-------+-------+
// |         click_time|user_id|ts_diff|
// +-------------------+-------+-------+
// |2018-01-01 11:00:00|     u1|      0|
// |2018-01-01 12:10:00|     u1|      0|
// |2018-01-01 13:00:00|     u1|   3000|
// |2018-01-01 13:50:00|     u1|   3000|
// |2018-01-01 14:40:00|     u1|   3000|
// |2018-01-01 15:30:00|     u1|   3000|
// |2018-01-01 16:20:00|     u1|   3000|
// |2018-01-01 16:50:00|     u1|   1800|
// |2018-01-01 11:00:00|     u2|      0|
// |2018-01-02 11:00:00|     u2|      0|
// +-------------------+-------+-------+

Шаги 2 - 4:

val df2 = df1.
  groupBy("user_id").agg(
    collect_list($"click_time").as("click_list"), collect_list($"ts_diff").as("ts_list")
  ).
  withColumn("click_sess_id",
    explode(clickSessList(tmo2)($"user_id", $"click_list", $"ts_list"))
  ).
  select($"user_id", $"click_sess_id._1".as("click_time"), $"click_sess_id._2".as("sess_id"))

df2.show
// +-------+-------------------+-------+
// |user_id|click_time         |sess_id|
// +-------+-------------------+-------+
// |u1     |2018-01-01 11:00:00|u1-1   |
// |u1     |2018-01-01 12:10:00|u1-2   |
// |u1     |2018-01-01 13:00:00|u1-2   |
// |u1     |2018-01-01 13:50:00|u1-2   |
// |u1     |2018-01-01 14:40:00|u1-3   |
// |u1     |2018-01-01 15:30:00|u1-3   |
// |u1     |2018-01-01 16:20:00|u1-3   |
// |u1     |2018-01-01 16:50:00|u1-4   |
// |u2     |2018-01-01 11:00:00|u2-1   |
// |u2     |2018-01-02 11:00:00|u2-2   |
// +-------+-------------------+-------+

Также обратите внимание, что click_time "пропущено" в шагах 2 - 4, чтобы быть включеннымв окончательном наборе данных.

...