выбор и фильтрация искр внутри агрегации - PullRequest
0 голосов
/ 28 мая 2020

Шаг 1: Создание DS

testDf = sparkSession.createDataFrame(
    [
        ['collection_1', 'i1,i2,i3,i4', '1590383291000'],
        ['collection_2', 'i2,i5,i6', '1590469691000'],
        ['collection_3', 'i1,i2', '1590556091000'],
        ['collection_4', 'i3', '1590642491000']
    ]
    , ["collection_id", "item_ids", "ts"]
)
+-------------+-----------+-------------+
|collection_id|   item_ids|           ts|
+-------------+-----------+-------------+
| collection_1|i1,i2,i3,i4|1590383291000|
| collection_2|   i2,i5,i6|1590469691000|
| collection_3|      i1,i2|1590556091000|
| collection_4|         i3|1590642491000|
+-------------+-----------+-------------+

Шаг 2: Промежуточный шаг -> Расчленить, используя item_ids, сгруппировать item_id и создать обратное отображение

explodedDf = testDf.select(
        "collection_id",
        f.split("item_ids", ",").alias('item_ids'),
        f.posexplode(f.split("item_ids", ",")).alias("pos", "item_id"),
        "ts"
    )
+-------------+----------------+---+-------+-------------+                      
|collection_id|        item_ids|pos|item_id|           ts|
+-------------+----------------+---+-------+-------------+
| collection_1|[i1, i2, i3, i4]|  0|     i1|1590383291000|
| collection_1|[i1, i2, i3, i4]|  1|     i2|1590383291000|
| collection_1|[i1, i2, i3, i4]|  2|     i3|1590383291000|
| collection_1|[i1, i2, i3, i4]|  3|     i4|1590383291000|
| collection_2|    [i2, i5, i6]|  0|     i2|1590469691000|
| collection_2|    [i2, i5, i6]|  1|     i5|1590469691000|
| collection_2|    [i2, i5, i6]|  2|     i6|1590469691000|
| collection_3|        [i1, i2]|  0|     i1|1590556091000|
| collection_3|        [i1, i2]|  1|     i2|1590556091000|
| collection_4|            [i3]|  0|     i3|1590642491000|
+-------------+----------------+---+-------+-------------+
explodedDf.groupBy("item_id").agg(f.max("ts").alias("latest_ts"),f.collect_set("collection_id").alias("collection_ids"), f.max("item_ids").alias("item_ids")).show(10)
+-------+-------------+------------------------------------------+----------------+
|item_id|latest_ts    |collection_ids                            |item_ids        |
+-------+-------------+------------------------------------------+----------------+
|i3     |1590642491000|[collection_1, collection_4]              |[i3]            |
|i5     |1590469691000|[collection_2]                            |[i2, i5, i6]    |
|i1     |1590556091000|[collection_3, collection_1]              |[i1, i2, i3, i4]|
|i6     |1590469691000|[collection_2]                            |[i2, i5, i6]    |
|i2     |1590556091000|[collection_3, collection_1, collection_2]|[i2, i5, i6]    |
|i4     |1590383291000|[collection_1]                            |[i1, i2, i3, i4]|
+-------+-------------+------------------------------------------+----------------+

Шаг 3 -> Ожидаемый результат

Вместо выполнения агрегации f.collect_set("collection_id") я хочу выбрать только collection_id с последним ts.

для столбца collection_ids

  • [collection_1, collection_4] -> показать только [collection_4], так как в нем есть последние ts (см. Шаг 1)
  • [collection_2] -> показать [collection_2]
  • [collection_3, collection_1] -> показать только [collection_3], поскольку у него есть последние ts
  • [collection_2] -> показать [collection_2]
  • [collection_3, collection_1, collection_2] -> показать только [collection_3], поскольку у него есть последние ts
  • [collection_1] -> показать [collection_1]

1 Ответ

3 голосов
/ 28 мая 2020

Я думаю, вы можете использовать окно поверх item_id после explode, а затем filter, начиная с исходного df, вы можете попробовать ниже:

w = Window.partitionBy("item_id")

output = testDf.select(
        "collection_id",
        F.split("item_ids", ",").alias('item_ids'),
        F.explode(F.split("item_ids", ",")).alias("item_id"),
        "ts"
    ).withColumn("latest_ts",F.max("ts").over(w)).filter("ts==latest_ts").drop("ts")

output.select(*['item_id','latest_ts','collection_id','item_ids']).show()

+-------+-------------+-------------+----------------+
|item_id|    latest_ts|collection_id|        item_ids|
+-------+-------------+-------------+----------------+
|     i3|1590642491000| collection_4|            [i3]|
|     i5|1590469691000| collection_2|    [i2, i5, i6]|
|     i1|1590556091000| collection_3|        [i1, i2]|
|     i6|1590469691000| collection_2|    [i2, i5, i6]|
|     i2|1590556091000| collection_3|        [i1, i2]|
|     i4|1590383291000| collection_1|[i1, i2, i3, i4]|
+-------+-------------+-------------+----------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...