Функция Spark Window последняя не нулевое значение - PullRequest
2 голосов
/ 17 июня 2019

У нас есть база данных временных рядов для пользовательских событий, которая выглядит следующим образом:

timestamp             user_id     event            ticke_type     error_type 
2019-06-06 14:33:31   user_a      choose_ticket    ticke_b        NULL
2019-06-06 14:34:31   user_b      choose_ticket    ticke_f        NULL
2019-06-06 14:36:31   user_a      booing_error     NULL           error_c  
2019-06-06 14:37:31   user_a      choose_ticket    ticke_h        NULL
2019-06-06 14:38:31   user_a      booing_error     NULL           error_d
2019-06-06 14:39:31   user_a      booing_error     NULL           error_e

Это один пример использования, который нам нужен:

Чтобы выяснить, какой билетТип вызывает некоторую ошибку при бронировании, нам нужно будет посмотреть тип билета, который доступен только для более раннего события choose_ticket.

В этом случае мы ищем каждое событие booking_errorнайдите предыдущее событие choose_ticket для того же пользователя и объедините в нем тип заявки с событием booking_error.

В идеале мы хотим получить вывод:

timestamp             user_id     event            ticke_type     error_type 
2019-06-06 14:36:31   user_a      booing_error     ticke_b        error_c  
2019-06-06 14:38:31   user_a      booing_error     ticke_h        error_d
2019-06-06 14:39:31   user_a      booing_error     ticke_h        error_e

Самое близкое, что я могу найти, это Spark добавить новый столбец в фрейм данных со значением из предыдущей строки , что позволяет нам взять свойства из предыдущего события и применить его к событию сразу после.

Это почти работает, за исключением того, что при наличии нескольких событий (booing_error в этом примере) только самые первые могут получить необходимые свойства в этом случае.например, это то, что мы получим с решением по ссылке SO выше:

timestamp             user_id     event            ticke_type     error_type 
2019-06-06 14:36:31   user_a      booing_error     ticke_b        error_c  
2019-06-06 14:38:31   user_a      booing_error     ticke_h        error_d
2019-06-06 14:39:31   user_a      booing_error     NULL           error_e

Чтобы подвести итог, для данной строки, как найти предыдущую строку, соответствующую определенным критериям, и "cherry-pick" еесобственность закончилась?

Какой лучший способ сделать это?

Ответы [ 2 ]

2 голосов
/ 18 июня 2019

org.apache.spark.sql.functions.last - это то, что вы ищете. Вы можете переименовать «ближайший» столбец, чтобы заменить ticke_type в конце.

scala> df.show
+-------------------+-------+-------------+----------+----------+
|          timestamp|user_id|        event|ticke_type|error_type|
+-------------------+-------+-------------+----------+----------+
|2019-06-06 14:33:31| user_a|choose_ticket|   ticke_b|      null|
|2019-06-06 14:34:31| user_b|choose_ticket|   ticke_f|      null|
|2019-06-06 14:36:31| user_a|booking_error|      null|   error_c|
|2019-06-06 14:37:31| user_a|choose_ticket|   ticke_h|      null|
|2019-06-06 14:38:31| user_a|booking_error|      null|   error_d|
|2019-06-06 14:39:31| user_a|booking_error|      null|   error_e|
+-------------------+-------+-------------+----------+----------+

scala> val overColumns = Window.partitionBy("user_id").orderBy("timestamp")
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@70dc8c9a

scala> df.withColumn("closest", 
  org.apache.spark.sql.functions.last("ticke_type", true).over(overColumns)).filter($"event" === "booking_error").show
+-------------------+-------+-------------+----------+----------+-------+
|          timestamp|user_id|        event|ticke_type|error_type|closest|
+-------------------+-------+-------------+----------+----------+-------+
|2019-06-06 14:36:31| user_a|booking_error|      null|   error_c|ticke_b|
|2019-06-06 14:38:31| user_a|booking_error|      null|   error_d|ticke_h|
|2019-06-06 14:39:31| user_a|booking_error|      null|   error_e|ticke_h|
+-------------------+-------+-------------+----------+----------+-------+
1 голос
/ 19 июня 2019

Вот версия pyspark

 df = self.spark.createDataFrame(
            [('2019-06-06 14:33:31', 'user_a', 'choose_ticket', 'ticke_b', None),
             ('2019-06-06 14:34:31', 'user_b', 'choose_ticket', 'ticke_f', None),
             ('2019-06-06 14:36:31', 'user_a', 'booing_error', None, 'error_c'),
             ('2019-06-06 14:37:31', 'user_a', 'choose_ticket', 'ticke_h', None),
             ('2019-06-06 14:38:31', 'user_a', 'booing_error', None, 'error_d'),
             ('2019-06-06 14:39:31', 'user_a', 'booing_error', None, 'error_e'),
             ],
            ("timestamp", "user_id", "event", "ticke_type", "error_type"))

        df.show()

        window_spec = Window.partitionBy(col("user_id")).orderBy(col("timestamp"))

        df = df.withColumn('ticke_type_forwardfill', when(col("event") == "choose_ticket", col("ticke_type")) \
                           .otherwise(last("ticke_type", True).over(window_spec))) \
            .drop(col("ticke_type")) \
            .filter(col("event") == "booing_error")

        df.show()

результат

+-------------------+-------+-------------+----------+----------+
|          timestamp|user_id|        event|ticke_type|error_type|
+-------------------+-------+-------------+----------+----------+
|2019-06-06 14:33:31| user_a|choose_ticket|   ticke_b|      null|
|2019-06-06 14:34:31| user_b|choose_ticket|   ticke_f|      null|
|2019-06-06 14:36:31| user_a| booing_error|      null|   error_c|
|2019-06-06 14:37:31| user_a|choose_ticket|   ticke_h|      null|
|2019-06-06 14:38:31| user_a| booing_error|      null|   error_d|
|2019-06-06 14:39:31| user_a| booing_error|      null|   error_e|
+-------------------+-------+-------------+----------+----------+

+-------------------+-------+------------+----------+----------------------+
|          timestamp|user_id|       event|error_type|ticke_type_forwardfill|
+-------------------+-------+------------+----------+----------------------+
|2019-06-06 14:36:31| user_a|booing_error|   error_c|               ticke_b|
|2019-06-06 14:38:31| user_a|booing_error|   error_d|               ticke_h|
|2019-06-06 14:39:31| user_a|booing_error|   error_e|               ticke_h|
+-------------------+-------+------------+----------+----------------------+



Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...