Свеча помечает повторяющийся логин пользователя в течение 24 часов после первого входа - PullRequest
0 голосов
/ 23 марта 2020

У меня есть набор данных с пользователями и временем входа. Мне нужно пометить дубликат, если есть / дополнительные логины в течение 24 часов после первого входа. Откроется окно активности с логином пользователя. Например, вот примерный набор данных

user    login
-----------------------------
user1   12/1/19 8:00
user1   12/1/19 10:00
user1   12/1/19 23:00
user1   12/2/19 7:00
user1   12/2/19 8:00
user1   12/2/19 10:00
user1   12/3/19 23:00
user1   12/4/19 7:00
user2   12/4/19 8:00
user2   12/5/19 5:00
user2   12/6/19 0:00

Ожидаемый результат

user    login           Duplicate
---------------------------------
user1   12/1/19 8:00    N    this is first login for user1 - 24 hour window opens here
user1   12/1/19 10:00   Y    since this is within 24 hours 
user1   12/1/19 23:00   Y   
user1   12/2/19 7:00    Y
user1   12/2/19 8:00    Y
user1   12/2/19 10:00   N   This activity time is greater than (previous window open + 24 hrs). previous window closes and second window opens here
user1   12/3/19 23:00   N
user1   12/4/19 7:00    Y
user2   12/4/19 8:00    N
user2   12/5/19 5:00    Y
user2   12/6/19 0:00    N

Я посмотрел Spark SQL оконная функция со сложным условием , но это решение победило ' не работает, если пользовательские входы в систему с фиксированным интервалом, скажем, каждые 18 часов.

Вот еще один пример (если решение учитывает только первое действие для расчета 24-часового окна, оно даст неверный результат (не дублируется) для записи # 7 ниже)

user1   12/1/19 8:00    N    this is first login for user1 - 24 hour window opens here
user1   12/1/19 10:00   Y    since this is within 24 hours 
user1   12/1/19 23:00   Y   
user1   12/2/19 7:00    Y
user1   12/2/19 8:00    Y
user1   12/2/19 10:00   N  This activity time is greater than (previous window open + 24 hrs). previous window closes and second window opens here
**user1   12/3/19 09:00 N**
user1   12/3/19 23:00   N
user1   12/4/19 7:00    Y
user2   12/4/19 8:00    N
user2   12/5/19 5:00    Y
user2   12/6/19 0:00    N

Ответы [ 2 ]

1 голос
/ 23 марта 2020

Мне не известны какие-либо встроенные функции Spark, которые могут последовательно идентифицировать начало следующего 24-часового сеанса (или любого заданного периода времени) в зависимости от того, где предыдущий сеанс заканчивается динамически c. Один из подходов к выполнению такого требования заключается в использовании UDF, которая использует функцию Scala fold:

def dupeFlags(tLimit: Long) = udf{ (logins: Seq[String], tsDiffs: Seq[Long]) =>
  val flags = tsDiffs.foldLeft( (List[String](), 0L) ){ case ((flags, tsAcc), ts) =>
    if (ts == 0 || tsAcc + ts > tLimit)
      ("N" :: flags, 0L)
    else
      ("Y" :: flags, tsAcc + ts)
  }._1.reverse
  logins zip flags
}

UDF принимает список time-diff (в секундах между текущей и предыдущей строками) для быть обработанным. Обратите внимание, что аккумулятор для foldLeft в UDF является кортежем (flags, tsA cc), где:

  • flags - это список дублирующихся флагов, которые должны быть возвращены
  • tsAcc предназначен для переноса условно-кумулятивного значения метки времени на следующую итерацию

Также обратите внимание, что список login-date только «пропущен через» для включения в окончательный набор данных.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
  ("user1", "12/1/19 8:00"),
  ("user1", "12/1/19 10:00"),
  ("user1", "12/1/19 23:00"),
  ("user1", "12/2/19 7:00"),
  ("user1", "12/2/19 8:00"),
  ("user1", "12/2/19 10:00"),
  ("user1", "12/3/19 9:00"),
  ("user1", "12/3/19 23:00"),
  ("user1", "12/4/19 7:00"),
  ("user2", "12/4/19 8:00"),
  ("user2", "12/5/19 5:00"),
  ("user2", "12/6/19 0:00")
).toDF("user", "login")

Используя groupBy/collect_list, список time-diff вместе со списком login-date подается в UDF для генерации требуемых флагов-дубликатов, которые затем сглаживаются с помощью explode:

val win1 = Window.partitionBy("user").orderBy("ts")

df.
  withColumn("ts", unix_timestamp(to_timestamp($"login", "MM/dd/yy HH:mm"))).
  withColumn("tsPrev", coalesce(lag($"ts", 1).over(win1), $"ts")).
  groupBy("user").agg(collect_list($"login").as("logins"), collect_list($"ts" - $"tsPrev").as("tsDiffs")).
  withColumn("tuple", explode(dupeFlags(60 * 60 * 24L)($"logins", $"tsDiffs"))).
  select($"user", $"tuple._1".as("login"), $"tuple._2".as("duplicate")).
  show
// +-----+-------------+---------+
// | user|        login|duplicate|
// +-----+-------------+---------+
// |user1| 12/1/19 8:00|        N|
// |user1|12/1/19 10:00|        Y|
// |user1|12/1/19 23:00|        Y|
// |user1| 12/2/19 7:00|        Y|
// |user1| 12/2/19 8:00|        Y|
// |user1|12/2/19 10:00|        N|
// |user1| 12/3/19 9:00|        Y|
// |user1|12/3/19 23:00|        N|
// |user1| 12/4/19 7:00|        Y|
// |user2| 12/4/19 8:00|        N|
// |user2| 12/5/19 5:00|        Y|
// |user2| 12/6/19 0:00|        N|
// +-----+-------------+---------+
0 голосов
/ 23 марта 2020

Python: Вот преобразование моего scala кода.

from pyspark.sql.functions import col, lag, unix_timestamp, to_timestamp, lit, when, row_number, first
from pyspark.sql import Window

w = Window.partitionBy("user", "index").orderBy("login")
df2 = df.withColumn("login", to_timestamp(col("login"), "MM/dd/yy HH:mm"))

df2.join(df2.groupBy("user").agg(first("login").alias("firstLogin")), "user", "left") \
   .withColumn("index", ((unix_timestamp(col("login")) - unix_timestamp(col("firstLogin"))) / 86400).cast("int")) \
   .withColumn("Duplicate", when(row_number().over(w) == 1, lit("N")).otherwise(lit("Y"))) \
   .orderBy("user", "login") \
   .show(20)

Scala: Вы можете использовать функцию lag со столбцом индекса разницы во времени, как я сделал,

import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy("user", "index").orderBy("login")
val df2 = df.withColumn("login", to_timestamp($"login", "MM/dd/yy HH:mm"))

df2.join(df2.groupBy("user").agg(first("login").as("firstLogin")), Seq("user"), "left")
   .withColumn("index", ((unix_timestamp(col("login")) - unix_timestamp(col("firstLogin"))) / 86400).cast("int"))
   .withColumn("Duplicate", when(row_number.over(w) === 1, lit("N")).otherwise(lit("Y")))
   .orderBy("user", "login")

   .show

Результат:

+-----+-------------------+-------------------+-----+---------+
| user|              login|         firstLogin|index|Duplicate|
+-----+-------------------+-------------------+-----+---------+
|user1|2019-12-01 08:00:00|2019-12-01 08:00:00|    0|        N|
|user1|2019-12-01 10:00:00|2019-12-01 08:00:00|    0|        Y|
|user1|2019-12-01 23:00:00|2019-12-01 08:00:00|    0|        Y|
|user1|2019-12-02 07:00:00|2019-12-01 08:00:00|    0|        Y|
|user1|2019-12-02 08:00:00|2019-12-01 08:00:00|    1|        N|
|user1|2019-12-02 10:00:00|2019-12-01 08:00:00|    1|        Y|
|user1|2019-12-03 23:00:00|2019-12-01 08:00:00|    2|        N|
|user1|2019-12-04 07:00:00|2019-12-01 08:00:00|    2|        Y|
|user2|2019-12-04 08:00:00|2019-12-04 08:00:00|    0|        N|
|user2|2019-12-05 05:00:00|2019-12-04 08:00:00|    0|        Y|
|user2|2019-12-06 00:00:00|2019-12-04 08:00:00|    1|        N|
+-----+-------------------+-------------------+-----+---------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...