Попробуйте:
- Сначала создайте DataFrame, заметьте, я добавил две строки с
new_session==2
, чтобы продемонстрировать, что алгоритм работает с более чем двумя типами new_session
.
df_tes = spark.createDataFrame([
(1, "item_1"),
(1, "item_2"),
(0, "item_3"),
(2, "item_7"),
(0, "item_4"),
(2, "item_8"),
(1, "item_1")], ["new_session", "item"])
df_tes.show()
+-----------+------+
|new_session| item|
+-----------+------+
| 1|item_1|
| 1|item_2|
| 0|item_3|
| 2|item_7|
| 0|item_4|
| 2|item_8|
| 1|item_1|
+-----------+------+
Подготовьте данные: мы хотим знать существующий порядок сортировки и сгенерировать правильные идентификаторы, используя строки с
new_session==1
.
# Create a `dummy` column so we know the original sort order for the rows. If
# you already have a column for this, you don't need to create the `dummy` column.
df = df_tes.withColumn('dummy', F.monotonically_increasing_id())
# Create the correct IDs using rows with `new_session == 1`, note we use `dummy` to keep the original order.
df = df.withColumn('id_temp', F.row_number().over(Window.orderBy('dummy').partitionBy('new_session'))).orderBy('dummy')
# Put all rows with new_session != 1 into the same group with 'used == 0'. This
# helps us to handle cases when there are more than two types of `new_session`.
df = df.withColumn('used', F.when(F.col('new_session')==1, 1).otherwise(0))
df.show()
+-----------+------+-----------+-------+----+
|new_session| item| dummy|id_temp|used|
+-----------+------+-----------+-------+----+
| 1|item_1| 8589934592| 1| 1|
| 1|item_2|17179869184| 2| 1|
| 0|item_3|25769803776| 1| 0|
| 2|item_7|34359738368| 1| 0|
| 0|item_4|42949672960| 2| 0|
| 2|item_8|51539607552| 2| 0|
| 1|item_1|60129542144| 3| 1|
+-----------+------+-----------+-------+----+
Генерация окончательных
session_id
с.
# First, for each row, we use `lag` function to get the `id` from its previous
# row. So for one row next to a row of `new_session==1`, it'll pick its correct ID here.
w = Window.orderBy("dummy").rowsBetween(-1, -1)
df = df.withColumn('lag_id', F.lag('id_temp', 1, 1).over(w))
# For the rows with `used==0` (i.e. `new_session!=1`), use `first` to apply the
# correct IDs to all rows.
df = df.withColumn('lag_id', F.first('lag_id').over(Window.partitionBy('used')))
# Now we can use if-else to set the `session_id` properly.
df = df.withColumn('session_id', F.when(F.col('used')==1, F.col('id_temp')).otherwise(F.col('lag_id')))
df.orderBy('dummy').show()
+-----------+------+-----------+-------+----+------+----------+
|new_session| item| dummy|id_temp|used|lag_id|session_id|
+-----------+------+-----------+-------+----+------+----------+
| 1|item_1| 8589934592| 1| 1| 1| 1|
| 1|item_2|17179869184| 2| 1| 1| 2|
| 0|item_3|25769803776| 1| 0| 2| 2|
| 2|item_7|34359738368| 1| 0| 2| 2|
| 0|item_4|42949672960| 2| 0| 2| 2|
| 2|item_8|51539607552| 2| 0| 2| 2|
| 1|item_1|60129542144| 3| 1| 1| 3|
+-----------+------+-----------+-------+----+------+----------+
Вы можете безопасно отбросить вспомогательные столбцы здесь и получить session_id
.