Как разбить данные на серии в зависимости от условий в Apache Spark - PullRequest
0 голосов
/ 23 февраля 2019

У меня есть данные в следующем формате, отсортированном по отметке времени, каждая строка представляет событие:

+----------+--------+---------+
|event_type|  data  |timestamp|
+----------+--------+---------+
|     A    |    d1  |    1    |
|     B    |    d2  |    2    |
|     C    |    d3  |    3    |
|     C    |    d4  |    4    |
|     C    |    d5  |    5    |
|     A    |    d6  |    6    |
|     A    |    d7  |    7    |
|     B    |    d8  |    8    |
|     C    |    d9  |    9    |
|     B    |    d10 |    12   |
|     C    |    d11 |    20   |
+----------+--------+---------+

Мне нужно собрать эти события в серии следующим образом:
1. Событие типа C помечаетконец серии
2. Если имеется несколько последовательных событий типа C, они попадают в одну серию, а последняя отмечает конец этой серии
3. Каждая серия может охватывать7 дней при макс. , даже если нет события C для его завершения

Обратите также внимание, что в один день может быть несколько серий.На самом деле столбцы отметок времени являются стандартными отметками времени UNIX, здесь цифры для простоты обозначают дни.

Поэтому желаемый результат будет выглядеть следующим образом:

+---------------------+--------------------------------------------------------------------+
|first_event_timestamp|                events: List[(event_type, data,  timestamp)]        |
+---------------------+--------------------------------------------------------------------+
|          1          | List((A, d1, 1), (B, d2, 2), (C, d3, 3),  (C, d4, 4), (C, d5, 5))  |
|          6          | List((A, d6, 6), (A, d7, 7), (B, d8, 8),  (C, d9, 9))              |
|          12         | List((B, d10, 12))                                                 |
|          20         | List((C, d11, 20))                                                 |
+---------------------+--------------------------------------------------------------------+

Я пытался решить эту проблемуиспользуя функции окна, где я бы добавил 2 столбца, например:
1. Событие, помеченное столбцом семени, непосредственно после события типа C с использованием некоторого уникального идентификатора
2. SeriesId был заполнен значениями из столбца семени, используя last ()чтобы пометить все события в одной серии с одинаковым идентификатором
3. Затем я бы сгруппировал события по SeriesId

К сожалению, это не представляется возможным:

+----------+--------+---------+------+-----------+
|event_type|  data  |timestamp| seed | series_id |
+----------+--------+---------+------+-----------+
|     A    |    d1  |    1    | null |    null   |
|     B    |    d2  |    2    | null |    null   |
|     C    |    d3  |    3    | null |    null   |
|     C    |    d4  |    4    |   0  |     0     |     
|     C    |    d5  |    5    |   1  |     1     |
|     A    |    d6  |    6    |   2  |     2     |
|     A    |    d7  |    7    | null |     2     |
|     B    |    d8  |    8    | null |     2     |
|     C    |    d9  |    9    | null |     2     |
|     B    |    d10 |    12   |   3  |     3     |
|     C    |    d11 |    20   | null |     3     |
+----------+--------+---------+------+-----------+
  1. Я не могу проверить предыдущую строку на равенство, используя lag (), то есть следующий код:
df.withColumn(
    "seed",
    when(
        (lag($"eventType", 1) === ventType.Conversion).over(w), 
        typedLit(DigestUtils.sha256Hex("some fields").substring(0, 32))
    )
)

throws

org.apache.spark.sql.AnalysisException: Expression '(lag (eventType # 76, 1, null) = C)' не поддерживается в оконной функции.

Как видно из таблицы, она не срабатывает в случае, когда есть несколько последовательных событий C, а также не будет работать для первой и последней серии.

Я застрял здесь, любая помощь будет принята (использование Dataframe / dataset api предпочтительнее).

1 Ответ

0 голосов
/ 24 февраля 2019

Вот подход

  1. Определение начала серии событий на основе условий
  2. Пометка записи как стартового события
  3. выбор записей запускасобытия
  4. получают дату окончания записи (если мы заказываем записи начальных событий desc, то предыдущее время начала будет текущим временем окончания серии)
  5. объединяет исходные данные с указанным выше набором данных

Здесь есть udf, чтобы пометить запись как «начало»

//tag the starting event, based on the conditions
 def tagStartEvent : (String,String,Int,Int) => String = (prevEvent:String,currEvent:String,prevTimeStamp:Int,currTimeStamp:Int)=>{
   //very first event is tagged as "start"
   if (prevEvent == "start")
     "start"
   else if ((currTimeStamp - prevTimeStamp) > 7 )
     "start"
   else {
     prevEvent match {
       case "C" =>
         if (currEvent == "A")
           "start"
         else if (currEvent == "B")
           "start"
         else // if current event C
           ""
       case _ => ""
     }
   }
 }
val tagStartEventUdf = udf(tagStartEvent)

data.csv

event_type,data,timestamp
A,d1,1
B,d2,2
C,d3,3
C,d4,4
C,d5,5
A,d6,6
A,d7,7
B,d8,8
C,d9,9
B,d10,12
C,d11,20
val df = spark.read.format("csv")
                  .option("header", "true")
                  .option("inferSchema", "true")
                  .load("data.csv")

    val window = Window.partitionBy("all").orderBy("timestamp")

    //tag the starting event
    val dfStart =
        df.withColumn("all", lit(1))
          .withColumn("series_start",
            tagStartEventUdf(
              lag($"event_type",1, "start").over(window), df("event_type"),
              lag($"timestamp",1,1).over(window),df("timestamp")))

    val dfStartSeries = dfStart.filter($"series_start" === "start").select(($"timestamp").as("series_start_time"),$"all")

    val window2 = Window.partitionBy("all").orderBy($"series_start_time".desc)
    //get the series end times
    val dfSeriesTimes = dfStartSeries.withColumn("series_end_time",lag($"series_start_time",1,null).over(window2)).drop($"all")

    val dfSeries =
          df.join(dfSeriesTimes).withColumn("timestamp_series",
              // if series_end_time is null and  timestamp >= series_start_time, then series_start_time
              when(col("series_end_time").isNull && col("timestamp") >= col("series_start_time"), col("series_start_time"))
                // if record greater or equal to series_start_time, and less than series_end_time, then series_start_time
                .otherwise(when((col("timestamp") >= col("series_start_time") && col("timestamp") < col("series_end_time")), col("series_start_time")).otherwise(null)))
                .filter($"timestamp_series".isNotNull)

   dfSeries.show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...