У меня есть два файла Parquet, которые мне нужно прочитать, чтобы вычислить разницу в одном из столбцов, с именем data
.
Каждая строка в каждом из файлов может быть идентифицирована уникальным index
значение.
Я хотел бы оптимизировать расчет, убедившись, что каждому Spark Executor назначен одинаковый набор индексов x
, чтобы не требовалась дополнительная перестановка данных.
В настоящее время я использую partitionBy для записи файлов Parquet в соответствии со значениями index
:
import random
from pyspark.sql import *
def get_indices(n):
index_list = list(range(n))
random.shuffle(index_list)
return index_list
def get_df(name, n):
IndexNames = Row("index", "data" + "_" + name)
rows = []
for i in get_indices(n):
rows.append(IndexNames(i, name + '_' + str(i)))
return spark.createDataFrame(rows)
df_1 = get_df("one", 3)
df_2 = get_df("two", 3)
df_1.write.partitionBy('index').parquet('/tmp/krzsl/df_1.parquet')
df_2.write.partitionBy('index').parquet('/tmp/krzsl/df_2.parquet')
Пример использования следующий:
df_1_read = spark.read.parquet('/tmp/krzsl/df_1.parquet/')
df_2_read = spark.read.parquet('/tmp/krzsl/df_2.parquet/')
cond = [df_1_read.index == df_2_read.index]
joined_df = df_1_read.join(df_2_read, cond, 'inner').select(df_1_read.index, df_1_read.data_one, df_2_read.data_two)
display(joined_df)
IНе удается найти способ проверить, каково содержимое двух фреймов данных на каждом Исполнителе ( проверить вопрос здесь ), но является ли это правильным подходом, позволяющим избежать дополнительной перестановки данных?