Последовательные операции в фреймах данных pyspark - PullRequest
0 голосов
/ 06 мая 2020

У меня вопрос о том, как лучше всего справиться с преобразованиями фреймов данных (df). Предположим, у меня есть основной df, и мне нужно соединить этот df с другими 3 df. Какой из приведенных ниже способов является лучшим (более эффективным c) для этого? Создать несколько файлов dfs или переназначить существующий?

1 - Один фрейм данных для каждого шага

df = spark.read.orc(file)
df2 = spark.read.orc(file2)
df3 = spark.read.orc(file3)
df4 = spark.read.orc(file4)

df5 = df.join(df2, df.col==df2.col, 'inner')
df6 = df5.join(df3, df5.col==df3.col, 'inner')
df7 = df6.join(df4, df6.col==df4.col, 'inner')

df7.write.orc(file)

2 - Переназначить существующему

df = spark.read.orc(file)
df2 = spark.read.orc(file2)
df3 = spark.read.orc(file3)
df4 = spark.read.orc(file4)

df = df.join(df2, df.col==df2.col, 'inner')
df = df.join(df3, df.col==df3.col, 'inner')
df = df.join(df4, df.col==df4.col, 'inner')

df.write.orc(file)

Ответы [ 2 ]

0 голосов
/ 06 мая 2020

Сначала следует отметить несколько моментов:

DataFrame равно immutable, любое преобразование на нем создаст новый DataFrame.

Все преобразования в Spark: lazy.

Таким образом, присвоение или переназначение преобразованного Dataframe переменной не имеет никакого значения с точки зрения query execution plan.

Итак, когда вы, наконец, материализуете результирующий DataFrame (.write в данном случае), Spark создаст (и оптимизирует) единый план запроса со всеми 4 соединениями в нем. Так что на самом деле не имеет значения **, как вы go пишете преобразование.

** Но да, есть несколько вещей, которые вы можете учитывать, если вы хорошо знаете свои данные и думаете, что внутреннее соединение потеряет число записей значительно, тогда, возможно, вы сможете выполнить это соединение перед другими внутренними соединениями.

0 голосов
/ 06 мая 2020

Второй. DAG Spark достаточно умен, чтобы обнаруживать соединения.

Еще лучше, во втором подходе вместо того, чтобы назначать df несколько раз, вы можете просто сделать:

df = df.join(df2, df.col==df2.col, 'inner')
    .join(df3, df.col==df3.col, 'inner')
    .join(df4, df.col==df4.col, 'inner')
...