Эффективное присоединение к pyspark - PullRequest
0 голосов
/ 28 ноября 2018

Я много читал о том, как делать эффективные объединения в pyspark.Я нашел следующие способы достижения эффективных объединений:

  • Используйте широковещательное объединение, если можете.( Я обычно не могу , потому что кадры данных слишком велики)
  • Рассмотрите возможность использования очень большого кластера.(Я бы предпочел не из-за $$$ ).
  • Использовать тот же разделитель .

Последним являетсяЯ бы хотел попробовать, но я не могу найти способ сделать это в pyspark.Я пробовал:

df.repartition(numberOfPartitions,['parition_col1','partition_col2'])

но это не помогает, это все еще занимает слишком много времени, пока я не остановлю его, потому что искры запутались в последних нескольких заданиях.

Итак,Как я могу использовать один и тот же разделитель в pyspark и ускорить мои объединения, или даже избавиться от перемешиваний, которые занимают вечность?Какой код мне нужно использовать?

PD : я проверил другие статьи, даже на stackoverflow , но я все еще не вижу код.

Ответы [ 2 ]

0 голосов
/ 10 января 2019

Спасибо @vikrantrana за ваш ответ, я постараюсь, если он мне когда-нибудь понадобится.Я говорю это потому, что обнаружил проблема не в «больших» объединениях , а в количестве вычислений до объединения.Представьте себе такой сценарий:

Я читаю таблицу и сохраняю данные в кадре с именем df1.Я читаю другую таблицу и храню ее в df2.Затем я выполняю огромное количество вычислений и присоединяюсь к обоим, и в итоге получаю соединение между df1 и df2.Проблема здесь была не в размере, а в том, что план выполнения spark был огромным, и он не мог поддерживать все промежуточные таблицы в памяти, поэтому он начал записывать на диск, и это заняло столько времени.

Решение, которое работало для меня, состояло в том, чтобы сохранить df1 и df2 на диске до объединения (я также сохранил другие промежуточные кадры данных, которые были результатом больших и сложных вычислений).

0 голосов
/ 11 декабря 2018

Вы также можете использовать двухпроходный подход, если он соответствует вашим требованиям. Сначала переразбейте данные и сохраните их, используя многораздельные таблицы (dataframe.write.partitionBy ()).Затем последовательно соединяйте подразделения в цикле, «добавляя» к той же таблице окончательных результатов.Это было хорошо объяснено Симом.см. ссылку ниже

двухпроходный подход для объединения больших фреймов данных в pyspark

на основе описанного выше случая. Мне удалось последовательно объединить подразделения в цикле, а затем сохранитьобъединенные данные в таблицу улья.

Вот код.

from pyspark.sql.functions import *
emp_df_1.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_1")
emp_df_2.withColumn("par_id",col('emp_id')%5).repartition(5, 'par_id').write.format('orc').partitionBy("par_id").saveAsTable("UDB.temptable_2")

Итак, если вы объединяетесь в целое число emp_id, вы можете разделить по идентификатору по модулю какое-то число, и таким образом выМожно перераспределить нагрузку между разделами спарк и записи, имеющие одинаковые ключи, будут сгруппированы и размещены на одном разделе.Затем вы можете читать и циклически просматривать данные каждого подраздела, объединять оба кадра данных и сохранять их вместе.

counter =0;
paritioncount = 4;
while counter<=paritioncount:
    query1 ="SELECT * FROM UDB.temptable_1 where par_id={}".format(counter)
    query2 ="SELECT * FROM UDB.temptable_2 where par_id={}".format(counter)
    EMP_DF1 =spark.sql(query1)
    EMP_DF2 =spark.sql(query2)
    df1 = EMP_DF1.alias('df1')
    df2 = EMP_DF2.alias('df2')
    innerjoin_EMP = df1.join(df2, df1.emp_id == df2.emp_id,'inner').select('df1.*')
    innerjoin_EMP.show()
    innerjoin_EMP.write.format('orc').insertInto("UDB.temptable")
    counter = counter +1

Я пробовал это, и это работает нормально.Это всего лишь пример демонстрации двухпроходного подхода.Ваши условия присоединения могут отличаться, и количество разделов также зависит от размера ваших данных.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...