Вот один из подходов, который сначала создает 5-минутные временные окна , собирает список событий для каждого раздела временного окна, а затем применяет udf
, чтобы отметить требуемые события:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import java.sql.Timestamp
val df = Seq(
(Timestamp.valueOf("2018-09-30 21:00:08"), "Event A"),
(Timestamp.valueOf("2018-09-30 21:03:11"), "Event C"),
(Timestamp.valueOf("2018-09-30 21:04:17"), "Event 3"),
(Timestamp.valueOf("2018-09-30 21:05:27"), "Event Y"),
(Timestamp.valueOf("2018-09-30 21:06:11"), "Event 5"),
(Timestamp.valueOf("2018-09-30 21:07:17"), "Event P"),
(Timestamp.valueOf("2018-09-30 21:08:25"), "Event X"),
(Timestamp.valueOf("2018-09-30 21:09:26"), "Event B"),
(Timestamp.valueOf("2018-09-30 21:10:39"), "Event O")
).toDF("Created", "Name")
val winSpec = Window.partitionBy($"Win5m")
def checkEvents(e1: String, e2: String) = udf(
(currEvent: String, events: Seq[String]) =>
events.contains(e1) && events.contains(e2) &&
events.indexOf(e1) < events.indexOf(e2) &&
(currEvent == e1 || currEvent == e2)
)
df.
withColumn("Win5m", window($"Created", "5 minutes")).
withColumn("Events", collect_list($"Name").over(winSpec)).
withColumn("marked", checkEvents("Event Y", "Event X")($"Name", $"Events")).
select($"Created", $"Name").
where($"marked").
show(false)
// +-------------------+-------+
// |Created |Name |
// +-------------------+-------+
// |2018-09-30 21:05:27|Event Y|
// |2018-09-30 21:08:25|Event X|
// +-------------------+-------+
Ниже приведен набор данных с промежуточными столбцами, исключенными из вышеуказанного конечного результата:
// +-------------------+-------+---------------------------------------------+---------------------------------------------+------+
// |Created |Name |Win5m |Events |marked|
// +-------------------+-------+---------------------------------------------+---------------------------------------------+------+
// |2018-09-30 21:00:08|Event A|[2018-09-30 21:00:00.0,2018-09-30 21:05:00.0]|[Event A, Event C, Event 3] |false |
// |2018-09-30 21:03:11|Event C|[2018-09-30 21:00:00.0,2018-09-30 21:05:00.0]|[Event A, Event C, Event 3] |false |
// |2018-09-30 21:04:17|Event 3|[2018-09-30 21:00:00.0,2018-09-30 21:05:00.0]|[Event A, Event C, Event 3] |false |
// |2018-09-30 21:05:27|Event Y|[2018-09-30 21:05:00.0,2018-09-30 21:10:00.0]|[Event Y, Event 5, Event P, Event X, Event B]|true |
// |2018-09-30 21:06:11|Event 5|[2018-09-30 21:05:00.0,2018-09-30 21:10:00.0]|[Event Y, Event 5, Event P, Event X, Event B]|false |
// |2018-09-30 21:07:17|Event P|[2018-09-30 21:05:00.0,2018-09-30 21:10:00.0]|[Event Y, Event 5, Event P, Event X, Event B]|false |
// |2018-09-30 21:08:25|Event X|[2018-09-30 21:05:00.0,2018-09-30 21:10:00.0]|[Event Y, Event 5, Event P, Event X, Event B]|true |
// |2018-09-30 21:09:26|Event B|[2018-09-30 21:05:00.0,2018-09-30 21:10:00.0]|[Event Y, Event 5, Event P, Event X, Event B]|false |
// |2018-09-30 21:10:39|Event O|[2018-09-30 21:10:00.0,2018-09-30 21:15:00.0]|[Event O] |false |
// +-------------------+-------+---------------------------------------------+---------------------------------------------+------+