Spark's Shuffle Sort Merge Join. Один DataFrame разделен на сегменты. Пользуется ли этим Spark? - PullRequest
0 голосов
/ 06 августа 2020

По работе с RDD я помню, что если один RDD ключ-значение (rdd1) имеет известное разбиение, то выполнение соединения с другим, несекционированным RDD ключ-значение (rdd2) даст преимущества в производительности. Это связано с тем, что 1) по сети необходимо будет передавать только данные rdd2 и 2) каждый элемент rdd2 нужно будет передать только одному узлу, а не всем, путем применения разделения ключа rdd1 к key of rdd2

Я изучаю объединение слиянием с произвольной сортировкой и фреймами данных. Пример в книге, которую я читаю (Learning Spark, 2nd Edition), предназначен для объединения двух DataFrames на основе столбцов user_id. В этом примере делается попытка продемонстрировать исключение этапа Exchange из операции объединения, поэтому перед объединением оба DataFrames разделены на равное количество сегментов столбцом, который будет объединен.

Мой вопрос есть, что произойдет, если только один из DataFrames был добавлен в сегменты? Ясно, что этап обмена снова появится. Но если мы знаем, что DataFrame1 разделен на N сегментов столбцом, к которому мы хотим присоединиться, будет ли Spark использовать эту информацию сегментирования для эффективной передачи строк DataFrame2 по сети, как в случае RDD? Может ли Spark оставить строки DataFrame1 там, где они есть, и просто применить идентичное ведение к DataFrame2? (Предполагая, что N сегментов приводит к разумному количеству данных в разделах, которые должны быть объединены исполнителями) Или вместо этого Spark неэффективно перемешивает оба DataFrames?

В частности, я могу представить ситуацию, когда у меня единственный «главный» DataFrame, с которым мне нужно будет выполнить множество независимых соединений с другими дополнительными DataFrame в том же столбце. Разумеется, необходимо предварительно создать сегмент только для главного DataFrame, чтобы увидеть преимущества в производительности для всех объединений? (Хотя, я думаю, не повредило бы и дополнительные DataFrames, если бы вы потрудились собрать их в корзину)

1 Ответ

1 голос
/ 06 августа 2020

https://kb.databricks.com/data/bucketing.html Это все объясняет с некоторыми приукрашиваниями по сравнению с исходными сообщениями, которые я резюмирую.

Итог:

val t1 = spark.table("unbucketed")
val t2 = spark.table("bucketed")
val t3 = spark.table("bucketed")

Без пряжки - соединение с интервалами. Обе стороны необходимо переразбить.

t1.join(t2, Seq("key")).explain()

Открепление с переразбивкой - соединение с интервалами. Незакрепленная сторона правильно перераспределена, и требуется только одна перетасовка.

t1.repartition(16, $"key").join(t2, Seq("key")).explain()

Открепленная с неправильным переразбиваниемg (по умолчанию (200) - соединение с разделением с шагом. необходимы.

t1.repartition($"key").join(t2, Seq("key")).explain()

с разделением - с разделением на сегменты. Идеальный случай, обе стороны имеют одинаковое разделение, а тасование не требуется .

t3.join(t2, Seq("key")).explain()

Таким образом, обе стороны нуждаются в одном ведре для оптимальной производительности.

...