Вы также можете использовать двухпроходный подход, если он соответствует вашим требованиям. Сначала переразбейте данные и сохраните их, используя многораздельные таблицы (dataframe.write.partitionBy ()).Затем последовательно соединяйте подразделения в цикле, «добавляя» к той же таблице окончательных результатов.Это было хорошо объяснено Симом.см. ссылку ниже
двухпроходный подход для объединения больших фреймов данных в pyspark
на основе описанного выше случая. Мне удалось последовательно объединить подразделения в цикле, а затем сохранитьобъединенные данные в таблицу улья.
Вот код.
from pyspark.sql.functions import *
emp_df_1.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_1")
emp_df_2.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_2")
Итак, если вы объединяетесь в целое число emp_id, вы можете разделить по идентификатору по модулю какое-то число, и таким образом выМожно перераспределить нагрузку между разделами спарк и записи, имеющие одинаковые ключи, будут сгруппированы и размещены на одном разделе.Затем вы можете читать и циклически просматривать данные каждого подраздела, объединять оба кадра данных и сохранять их вместе.
counter =0;
paritioncount = 4;
while counter<=paritioncount:
query1 ="SELECT * FROM UDB.temptable_1 where par_id={}".format(counter)
query2 ="SELECT * FROM UDB.temptable_2 where par_id={}".format(counter)
EMP_DF1 =spark.sql(query1)
EMP_DF2 =spark.sql(query2)
df1 = EMP_DF1.alias('df1')
df2 = EMP_DF2.alias('df2')
innerjoin_EMP = df1.join(df2, df1.emp_id == df2.emp_id,'inner').select('df1.*')
innerjoin_EMP.show()
innerjoin_EMP.write.format('orc').insertInto("UDB.temptable")
counter = counter +1
Я пробовал это, и это работает нормально.Это всего лишь пример демонстрации двухпроходного подхода.Ваши условия присоединения могут отличаться, и количество разделов также зависит от размера ваших данных.