Spark: Будет ли перестановка данных в одном узле при перестановке при вызове groupBy? - PullRequest
0 голосов
/ 04 марта 2019

Предположим, у меня есть некоторые данные, которые находятся на одном и том же разделе (ранее я выполнял .coalesce(1) на кадре данных).Теперь я хотел бы сгруппировать данные и выполнить их агрегирование.Если бы я использовал .groupBy на фрейме данных, разместили бы группы на разных узлах?

Я бы хотел избежать этого, если это правда, так как я хотел бы выполнить эти вычисления для групп, не перетасовывая слишком много.

Ответы [ 2 ]

0 голосов
/ 04 марта 2019

Во-первых, coalesce(1) не гарантирует, что все ваши данные будут находиться в одном узле, поэтому для использования repartition(1) это приведет к объединению в одном узле всех ваших данных.coalesce группирует только разделы в одном узле, поэтому, если ваши данные распределены по 5 узлам (по несколько разделов в каждом), в конце останется 5 разделов.repartition принудительно перемешать, чтобы переместить все ваши данные в один узел.

Но, если вас беспокоит количество разделений в агрегациях, это зависит от того, является ли агрегация всего лишь reduce от всехваши данные, спарк SQL будет пытаться уменьшить сначала в каждом узле, а затем уменьшить результат каждого узла, примером будет подсчет.Но для агрегированных агрегированных данных, таких как подсчет количества элементов с идентификатором, спарк сначала сокращается в каждом узле, а затем перетасовывает данные в сегменты, чтобы убедиться, что все сокращения каждого узла для одного и того же идентификаторав том же узле, и уменьшите их снова.Количество сегментов настраивается с помощью свойства spark.sql.shuffle.partitions, и каждое из них будет выполнено как задание в вашей работе.Будьте осторожны, так как установка spark.sql.shuffle.partitions на один может замедлить работу других частей вашего процесса, таких как объединения или большие агрегации, или привести к ошибкам нехватки памяти.

0 голосов
/ 04 марта 2019

Это зависит.По умолчанию количество разделов определяется spark.sql.shuffle.partitions.Один из способов избежать этого - использовать repartition с явным выражением разделения вместо coalesce:

val df = sparkSession.createDataFrame(
  sparkContext.parallelize(Seq(Row(1, "a"), Row(1, "b"), Row(2, "c"))),
  StructType(List(StructField("foo", IntegerType, true), StructField("bar", StringType, true))))
df.repartition(numPartitions = 1, $"foo").groupBy("foo").agg(count("*")).explain()

В общем случае можно использовать веб-интерфейс Spark и отслеживать показатели случайного чтения / записи в "Этапы "вкладка.

...