Шаг 1: Создание DS
testDf = sparkSession.createDataFrame(
[
['collection_1', 'i1,i2,i3,i4', '1590383291000'],
['collection_2', 'i2,i5,i6', '1590469691000'],
['collection_3', 'i1,i2', '1590556091000'],
['collection_4', 'i3', '1590642491000']
]
, ["collection_id", "item_ids", "ts"]
)
+-------------+-----------+-------------+
|collection_id| item_ids| ts|
+-------------+-----------+-------------+
| collection_1|i1,i2,i3,i4|1590383291000|
| collection_2| i2,i5,i6|1590469691000|
| collection_3| i1,i2|1590556091000|
| collection_4| i3|1590642491000|
+-------------+-----------+-------------+
Шаг 2: Промежуточный шаг -> Расчленить, используя item_ids
, сгруппировать item_id
и создать обратное отображение
explodedDf = testDf.select(
"collection_id",
f.split("item_ids", ",").alias('item_ids'),
f.posexplode(f.split("item_ids", ",")).alias("pos", "item_id"),
"ts"
)
+-------------+----------------+---+-------+-------------+
|collection_id| item_ids|pos|item_id| ts|
+-------------+----------------+---+-------+-------------+
| collection_1|[i1, i2, i3, i4]| 0| i1|1590383291000|
| collection_1|[i1, i2, i3, i4]| 1| i2|1590383291000|
| collection_1|[i1, i2, i3, i4]| 2| i3|1590383291000|
| collection_1|[i1, i2, i3, i4]| 3| i4|1590383291000|
| collection_2| [i2, i5, i6]| 0| i2|1590469691000|
| collection_2| [i2, i5, i6]| 1| i5|1590469691000|
| collection_2| [i2, i5, i6]| 2| i6|1590469691000|
| collection_3| [i1, i2]| 0| i1|1590556091000|
| collection_3| [i1, i2]| 1| i2|1590556091000|
| collection_4| [i3]| 0| i3|1590642491000|
+-------------+----------------+---+-------+-------------+
explodedDf.groupBy("item_id").agg(f.max("ts").alias("latest_ts"),f.collect_set("collection_id").alias("collection_ids"), f.max("item_ids").alias("item_ids")).show(10)
+-------+-------------+------------------------------------------+----------------+
|item_id|latest_ts |collection_ids |item_ids |
+-------+-------------+------------------------------------------+----------------+
|i3 |1590642491000|[collection_1, collection_4] |[i3] |
|i5 |1590469691000|[collection_2] |[i2, i5, i6] |
|i1 |1590556091000|[collection_3, collection_1] |[i1, i2, i3, i4]|
|i6 |1590469691000|[collection_2] |[i2, i5, i6] |
|i2 |1590556091000|[collection_3, collection_1, collection_2]|[i2, i5, i6] |
|i4 |1590383291000|[collection_1] |[i1, i2, i3, i4]|
+-------+-------------+------------------------------------------+----------------+
Шаг 3 -> Ожидаемый результат
Вместо выполнения агрегации f.collect_set("collection_id")
я хочу выбрать только collection_id с последним ts
.
для столбца collection_ids
- [collection_1, collection_4] -> показать только [collection_4], так как в нем есть последние ts (см. Шаг 1)
- [collection_2] -> показать [collection_2]
- [collection_3, collection_1] -> показать только [collection_3], поскольку у него есть последние ts
- [collection_2] -> показать [collection_2]
- [collection_3, collection_1, collection_2] -> показать только [collection_3], поскольку у него есть последние ts
- [collection_1] -> показать [collection_1]