Почему случайная запись происходит только для одного исполнителя:
Пожалуйста, проверьте разделы RDD, следующий образ пользовательского интерфейса поможет вам найти
![enter image description here](https://i.stack.imgur.com/K88XO.png)
Я думаю, что ваш СДР имеет только один раздел вместо 8 или более, который в конечном итоге будет использовать всех исполнителей.
rdd = rdd.repartition(8)
ИзбеганиеShuffle "Меньше этапа, бегите быстрее
Shuffling - это процесс перераспределения данных между разделами (так называемое перераспределение), который может вызывать или не вызывать перемещение данных между процессами JVM или даже по проводам (междуисполнители на отдельных машинах).
По умолчанию, перестановка не меняет количество разделов, так как у вас есть только один раздел, который выглядит медленно.
Как избежать перемешивания:
Когда оба RDD имеют дублирующиеся ключи, объединение может привести к значительному увеличению размера данных. Возможно, лучше выполнить отдельную операцию или операцию combByKey, чтобы уменьшить ключпространство или использовать cogroup для обработки дубликатов ключей вместо создания полного перекрестного продукта.Используя интеллектуальное разбиение на этапе объединения, можно предотвратить повторное перемешивание в объединении (мы обсудим это подробно позже).
Если ключи не присутствуют в обоих RDDвы рискуете потерять ваши данные неожиданно.Может быть безопаснее использовать внешнее объединение, так что вы гарантированно сохраните все данные в левом или правом СДР, а затем отфильтруете данные после объединения.
Еслиодин RDD имеет некоторое легко определяемое подмножество клавиш, в другом вам лучше отфильтровать или уменьшить перед объединением, чтобы избежать большого перемешивания данных, которое вы в конечном итоге выбросите.
Чтобы объединить данные, Spark нужны данные, которые должны быть объединены (т. Е. Данные, основанные на каждом ключе), чтобы жить в одном разделе.Реализация объединения по умолчанию в Spark - это перемешанное хеш-соединение.Перемешанное хеш-соединение гарантирует, что данные в каждом разделе будут содержать одни и те же ключи, путем разбиения второго набора данных с тем же разделителем по умолчанию, что и первый, так что ключи с одинаковым значением хеш-функции из обоих наборов данных находятся в одном и том же разделе.Хотя этот подход всегда работает, он может быть дороже, чем необходимо, потому что он требует перемешивания.Перестановки можно избежать, если:
1. Оба RDD имеют известный разделитель.
- Один из наборов данных достаточно мал, чтобы поместиться в памяти, и в этом случае мы можем выполнить широковещательное хеш-соединение (мы объясним, что это будет позже).
Обратите внимание, что если RDD размещены в одном месте, можно избежать передачи по сети вместе с перемешиванием.Всегда сохраняться после перераспределения
Объединения DataFrame Объединение данных между DataFrames является одним из наиболее распространенных преобразований нескольких DataFrame.Все стандартные типы соединений SQL поддерживаются и могут быть указаны как joinType в df.join (otherDf, sqlCondition, joinType) при выполнении соединения.Как и в случае соединений между RDD, соединение с неуникальными ключами приведет к перекрестному произведению (поэтому, если в левой таблице есть R1 и R2 с ключом 1, а в правой таблице есть R3 и R5 с ключом 1, вы получите (R1, R3), (R1,R5), (R2, R3), (R2, R5)) на выходе.
Используя самосоединение и горит (true), вы можете получить декартово произведение вашегоНабор данных, который может быть полезен, но также иллюстрирует, как объединения (особенно самостоятельные объединения) могут легко привести к неработоспособным размерам данных.
Использование широковещательного соединения с широковещательным соединением позволяет очень эффективно объединить большую таблицу (факт) с относительно небольшими таблицами (измерениями), избегая отправки всех данных большой таблицы по сети. Вы можете использовать функцию широковещания дляпометить набор данных для трансляции при использовании в операторе соединения.Он использует настройку spark.sql.autoBroadcastJoinThreshold для управления размером таблицы, которая будет транслироваться на все рабочие узлы при выполнении объединения.
Использовать тот же разделитель.Если два RDD имеют одинаковый разделитель, объединение не вызовет тасования.Обратите внимание, однако, что отсутствие перемешивания не означает, что никакие данные не должны быть перемещены между узлами.Два RDD могут иметь один и тот же разделитель (быть совместно разделенным), но при этом иметь соответствующие разделы, расположенные на разных узлах (не располагаться совместно).Эта ситуация все же лучше, чем делать случайные, но об этом нужно помнить.Совместное размещение может улучшить производительность, но трудно гарантировать.
Если данные огромны и / или ваши кластеры не могут расти так, что даже приводит к OOM, используйте двухпроходный подход,Сначала заново разделите данные и сохраните их, используя многораздельные таблицы (dataframe.write.partitionBy ()).Затем последовательно соедините подразделения в цикле, «добавляя» к той же таблице окончательных результатов.
https://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications
- https://medium.com/@foundev/you-won-t-believe-how-spark-shuffling-will-probably-bite-you-also-windowing-e39d07bf754e
- https://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/ - https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-shuffle.html