Мне не известны какие-либо встроенные функции Spark, которые могут последовательно идентифицировать начало следующего 24-часового сеанса (или любого заданного периода времени) в зависимости от того, где предыдущий сеанс заканчивается динамически c. Один из подходов к выполнению такого требования заключается в использовании UDF, которая использует функцию Scala fold
:
def dupeFlags(tLimit: Long) = udf{ (logins: Seq[String], tsDiffs: Seq[Long]) =>
val flags = tsDiffs.foldLeft( (List[String](), 0L) ){ case ((flags, tsAcc), ts) =>
if (ts == 0 || tsAcc + ts > tLimit)
("N" :: flags, 0L)
else
("Y" :: flags, tsAcc + ts)
}._1.reverse
logins zip flags
}
UDF принимает список time-diff
(в секундах между текущей и предыдущей строками) для быть обработанным. Обратите внимание, что аккумулятор для foldLeft
в UDF является кортежем (flags, tsA cc), где:
flags
- это список дублирующихся флагов, которые должны быть возвращены tsAcc
предназначен для переноса условно-кумулятивного значения метки времени на следующую итерацию
Также обратите внимание, что список login-date
только «пропущен через» для включения в окончательный набор данных.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq(
("user1", "12/1/19 8:00"),
("user1", "12/1/19 10:00"),
("user1", "12/1/19 23:00"),
("user1", "12/2/19 7:00"),
("user1", "12/2/19 8:00"),
("user1", "12/2/19 10:00"),
("user1", "12/3/19 9:00"),
("user1", "12/3/19 23:00"),
("user1", "12/4/19 7:00"),
("user2", "12/4/19 8:00"),
("user2", "12/5/19 5:00"),
("user2", "12/6/19 0:00")
).toDF("user", "login")
Используя groupBy/collect_list
, список time-diff
вместе со списком login-date
подается в UDF для генерации требуемых флагов-дубликатов, которые затем сглаживаются с помощью explode
:
val win1 = Window.partitionBy("user").orderBy("ts")
df.
withColumn("ts", unix_timestamp(to_timestamp($"login", "MM/dd/yy HH:mm"))).
withColumn("tsPrev", coalesce(lag($"ts", 1).over(win1), $"ts")).
groupBy("user").agg(collect_list($"login").as("logins"), collect_list($"ts" - $"tsPrev").as("tsDiffs")).
withColumn("tuple", explode(dupeFlags(60 * 60 * 24L)($"logins", $"tsDiffs"))).
select($"user", $"tuple._1".as("login"), $"tuple._2".as("duplicate")).
show
// +-----+-------------+---------+
// | user| login|duplicate|
// +-----+-------------+---------+
// |user1| 12/1/19 8:00| N|
// |user1|12/1/19 10:00| Y|
// |user1|12/1/19 23:00| Y|
// |user1| 12/2/19 7:00| Y|
// |user1| 12/2/19 8:00| Y|
// |user1|12/2/19 10:00| N|
// |user1| 12/3/19 9:00| Y|
// |user1|12/3/19 23:00| N|
// |user1| 12/4/19 7:00| Y|
// |user2| 12/4/19 8:00| N|
// |user2| 12/5/19 5:00| Y|
// |user2| 12/6/19 0:00| N|
// +-----+-------------+---------+