Spark join: группировка записей, имеющих одинаковое значение для определенного столбца в одном разделе - PullRequest
2 голосов
/ 27 февраля 2020

У нас есть 2 таблицы Hive, которые читаются в spark и соединяются с помощью ключа соединения, назовем его user_id. Затем мы записываем этот объединенный набор данных в S3 и регистрируем его в качестве третьей таблицы для последующих задач, чтобы использовать этот объединенный набор данных. Один из других столбцов в объединенном наборе данных называется keychain_id.

Мы хотим сгруппировать все пользовательские записи, принадлежащие одному и тому же keychain_id, в один и тот же раздел, чтобы впоследствии избежать случайных перемешиваний. Итак, могу ли я сделать перераспределение («keychain_id») перед записью в s3 и регистрацией в Hive, и когда я прочитаю те же данные из этой третьей таблицы, будет ли она иметь ту же группу разделов (все пользователи, принадлежащие к одному и тому же keychain_id) в том же разделе)? Потому что каждый раз при чтении из этой 3-й таблицы стараемся не делать перераспределение («keychain_id»). Можете ли вы уточнить? Если нет гарантии, что при чтении он сохранит ту же группировку разделов, то есть ли другой эффективный способ сделать это, кроме кэширования?

1 Ответ

1 голос
/ 27 февраля 2020

если в keychain_id отсутствует перекос данных (приведет к размеру файла различий), вы можете сделать запись с помощью partitionBy:

 df.write\
 .partitionBy("keychain_id")\
 .mode("overwrite")\
 .format("parquet")\
 .saveAsTable("testing")

Обновление:

Чтобы 'сохранить группировку пользовательских записей с одинаковым keychain_id в одном и том же разделе данных'

Вы можете перераспределить ранее, с уникальными идентификаторами и / или столбцами

from pyspark.sql import functions as F
n = df.select(F.col('keychain_id')).distinct().count()

df.repartition(n, F.col("keychain_id)\
 .write \
 .partitionBy("keychain_id")\
 .mode("overwrite")\
 .format("parquet")\
 .saveAsTable("testing")

 or 

df.repartition(n)\
 .write \
 .partitionBy("keychain_id")\
 .mode("overwrite")\
 .format("parquet")\
 .saveAsTable("testing")
...