Соединение двух фреймов данных pyspark по уникальным значениям в столбце - PullRequest
0 голосов
/ 16 октября 2019

Допустим, у меня есть два фрейма данных pyspark, users и shops. Несколько примеров строк для обоих фреймов данных показаны ниже.

пользователи фрейм данных:

+---------+-------------+---------+
| idvalue | day-of-week | geohash |
+---------+-------------+---------+
| id-1    |           2 | gcutjjn |
| id-1    |           3 | gcutjjn |
| id-1    |           5 | gcutjht |
+---------+-------------+---------+

магазины фрейм данных

+---------+-----------+---------+
| shop-id | shop-name | geohash |
+---------+-----------+---------+
| sid-1   | kfc       | gcutjjn |
| sid-2   | mcd       | gcutjhq |
| sid-3   | starbucks | gcutjht |
+---------+-----------+---------+

Мне нужно объединить оба этих кадра данных в столбце геохэш. Я могу сделать наивное равное объединение наверняка, но фрейм данных users огромен, содержит миллиарды строк, и геохэш, скорее всего, повторится, внутри и между idvalues. Итак, мне было интересно, есть ли способ выполнения объединений уникальных геохешей в users dataframe и геохешах в shops dataframe. Если мы сможем это сделать, то легко скопировать записи магазинов для соответствующих геохешей в результирующем фрейме данных.

Вероятно, это может быть достигнуто с помощью pdf udf, где я бы выполнил групповую игру на users.idvalue, выполните объединение с shops в пределах udf, взяв только первую строку из группы (поскольку все идентификаторы в группе в любом случае одинаковы), и создав однорядный фрейм данных. Логически кажется, что это должно сработать, но не уверен точно в аспекте производительности, так как udf обычно медленнее, чем искровые нативные преобразования. Любые идеи приветствуются.

Ответы [ 2 ]

1 голос
/ 16 октября 2019

Вы сказали, что ваш пользовательский фрейм данных огромен и что "геошаши могут повторяться как внутри, так и между значениями". Однако вы не сослались на наличие дублированных геохешей в фрейме данных ваших магазинов.

Если в последнем нет повторных хэшей, я думаю, что простое соединение решит вашу проблему:

val userDf = Seq(("id-1",2,"gcutjjn"),("id-2",2,"gcutjjn"),("id-1",3,"gcutjjn"),("id-1",5,"gcutjht")).toDF("idvalue","day_of_week","geohash")
val shopDf = Seq(("sid-1","kfc","gcutjjn"),("sid-2","mcd","gcutjhq"),("sid-3","starbucks","gcutjht")).toDF("shop_id","shop_name","geohash")

userDf.show
+-------+-----------+-------+
|idvalue|day_of_week|geohash|
+-------+-----------+-------+
|   id-1|          2|gcutjjn|
|   id-2|          2|gcutjjn|
|   id-1|          3|gcutjjn|
|   id-1|          5|gcutjht|
+-------+-----------+-------+

shopDf.show
+-------+---------+-------+
|shop_id|shop_name|geohash|
+-------+---------+-------+
|  sid-1|      kfc|gcutjjn|
|  sid-2|      mcd|gcutjhq|
|  sid-3|starbucks|gcutjht|
+-------+---------+-------+

shopDf
    .join(userDf,Seq("geohash"),"inner")
    .groupBy($"geohash",$"shop_id",$"idvalue")
    .agg(collect_list($"day_of_week").alias("days"))
    .show
+-------+-------+-------+------+
|geohash|shop_id|idvalue|  days|
+-------+-------+-------+------+
|gcutjjn|  sid-1|   id-1|[2, 3]|
|gcutjht|  sid-3|   id-1|   [5]|
|gcutjjn|  sid-1|   id-2|   [2]|
+-------+-------+-------+------+

Если у вас есть повторяющиеся значения хеш-функции в вашем фрейме данных магазинов, возможный подход будет состоять в том, чтобы удалить эти повторяющиеся хеш-значения из вашего фрейма данных магазинов (если ваши требования позволяют это), а затем выполнить ту же операцию соединения.

val userDf = Seq(("id-1",2,"gcutjjn"),("id-2",2,"gcutjjn"),("id-1",3,"gcutjjn"),("id-1",5,"gcutjht")).toDF("idvalue","day_of_week","geohash")
val shopDf = Seq(("sid-1","kfc","gcutjjn"),("sid-2","mcd","gcutjhq"),("sid-3","starbucks","gcutjht"),("sid-4","burguer king","gcutjjn")).toDF("shop_id","shop_name","geohash")

userDf.show
+-------+-----------+-------+
|idvalue|day_of_week|geohash|
+-------+-----------+-------+
|   id-1|          2|gcutjjn|
|   id-2|          2|gcutjjn|
|   id-1|          3|gcutjjn|
|   id-1|          5|gcutjht|
+-------+-----------+-------+

shopDf.show
+-------+------------+-------+
|shop_id|   shop_name|geohash|
+-------+------------+-------+
|  sid-1|         kfc|gcutjjn|  <<  Duplicated geohash
|  sid-2|         mcd|gcutjhq|
|  sid-3|   starbucks|gcutjht|
|  sid-4|burguer king|gcutjjn|  <<  Duplicated geohash
+-------+------------+-------+

//Dataframe with hashes to exclude:
val excludedHashes = shopDf.groupBy("geohash").count.filter("count > 1")
excludedHashes.show
+-------+-----+
|geohash|count|
+-------+-----+
|gcutjjn|    2|
+-------+-----+

//Create a dataframe of shops without the ones with duplicated hashes
val cleanShopDf = shopDf.join(excludedHashes,Seq("geohash"),"left_anti")
cleanShopDf.show
+-------+-------+---------+
|geohash|shop_id|shop_name|
+-------+-------+---------+
|gcutjhq|  sid-2|      mcd|
|gcutjht|  sid-3|starbucks|
+-------+-------+---------+

//Perform the same join operation
cleanShopDf.join(userDf,Seq("geohash"),"inner")
    .groupBy($"geohash",$"shop_id",$"idvalue")
    .agg(collect_list($"day_of_week").alias("days"))
    .show
+-------+-------+-------+----+
|geohash|shop_id|idvalue|days|
+-------+-------+-------+----+
|gcutjht|  sid-3|   id-1| [5]|
+-------+-------+-------+----+

Предоставленный код был написан на Scala, но его можно легко преобразовать в Python.

Надеюсь, это поможет!

0 голосов
/ 16 октября 2019

Это идея, если возможно, что вы использовали pyspark SQL, чтобы выбрать отдельный геохэш и создать таблицу временных данных. Затем объединитесь из этой таблицы вместо фреймов данных.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...