Оптимальный способ расчета разницы между значениями столбцов в двух кадрах данных Spark, считанных из файлов Parquet - PullRequest
0 голосов
/ 08 февраля 2019

У меня есть два файла Parquet, которые мне нужно прочитать, чтобы вычислить разницу в одном из столбцов, с именем data.

Каждая строка в каждом из файлов может быть идентифицирована уникальным indexзначение.

Я хотел бы оптимизировать расчет, убедившись, что каждому Spark Executor назначен одинаковый набор индексов x, чтобы не требовалась дополнительная перестановка данных.

В настоящее время я использую partitionBy для записи файлов Parquet в соответствии со значениями index:

import random
from pyspark.sql import *

def get_indices(n):
    index_list = list(range(n))
    random.shuffle(index_list)
    return index_list

def get_df(name, n):
    IndexNames = Row("index", "data" + "_" + name)
    rows = []
    for i in get_indices(n):
        rows.append(IndexNames(i, name + '_' + str(i)))
    return spark.createDataFrame(rows)

df_1 = get_df("one", 3)
df_2 = get_df("two", 3)

df_1.write.partitionBy('index').parquet('/tmp/krzsl/df_1.parquet')
df_2.write.partitionBy('index').parquet('/tmp/krzsl/df_2.parquet')

Пример использования следующий:

df_1_read = spark.read.parquet('/tmp/krzsl/df_1.parquet/')
df_2_read = spark.read.parquet('/tmp/krzsl/df_2.parquet/')
cond = [df_1_read.index == df_2_read.index]
joined_df = df_1_read.join(df_2_read, cond, 'inner').select(df_1_read.index, df_1_read.data_one, df_2_read.data_two)
display(joined_df)

IНе удается найти способ проверить, каково содержимое двух фреймов данных на каждом Исполнителе ( проверить вопрос здесь ), но является ли это правильным подходом, позволяющим избежать дополнительной перестановки данных?

...