Данные искажают, в первую очередь, проблему при применении операций неисключающего ключа (тасования).Два наиболее распространенных примера:
- Невосстанавливающий
groupByKey
(RDD.groupByKey
, Dataset.groupBy(Key).mapGroups
, Dataset.groupBy.agg(collect_list)
). RDD
и Dataset
joins
.
В редких случаях проблема связана со свойствами ключа разделения и функции разделения, при этом отсутствует существующая проблема с распределением данных.
// All keys are unique - no obvious data skew
val rdd = sc.parallelize(Seq(0, 3, 6, 9, 12)).map((_, None))
// Drastic data skew
rdd.partitionBy(new org.apache.spark.HashPartitioner(3)).glom.map(_.size).collect
// Array[Int] = Array(5, 0, 0)
Какие шаги мы предпримем (перераспределение, объединение и т. Д.)?
Перераспределение (никогда coalesce
) может помочь вам в последнем случае на
- Смена разделителя.
- Настройка количества разделов для минимизации возможного влияния данных (здесь вы можете использовать те же правила, что и для ассоциативных массивов - следует отдавать предпочтение простому числу и степеням двух, хотя это может не решить проблему полностью, например, 3 впример, использованный выше).
В предыдущих случаях обычно перераспределение не принесет больших результатов, поскольку перекос естественным образом вызывается самой операцией.Значения с одним и тем же ключом не могут быть распределены по нескольким разделам, и исходный тип распределения данных минимально влияет на невосстанавливающий характер процесса.
Эти случаи необходимо обрабатывать путем корректировки логики вашего приложения.На практике это может означать несколько вещей, в зависимости от данных или проблемы:
Нужно ли нам убивать задание, а затем включать в файл jar решения для перекоса и повторно отправлять задание?
Обычно вам необходимо хотя бы повторно отправить задание с настройками параметров.
В некоторых случаях (в основном RDD
пакетных заданий) вы можете спроектировать свое приложение для мониторинга выполнения задач, а также для уничтожения и повторной отправки определенных заданий в случае возможного перекоса, но это может быть сложно реализовать правильно на практике.
В общем случае, если возможен перекос данных, вы должны спроектировать свое приложение так, чтобы оно было защищено от перекосов данных.
Можем ли мы решить эту проблему, выполнив такие команды, как (coalesce) напрямуюиз оболочки, не убивая работу?
Я полагаю, что на это уже отвечают вышеприведенные пункты, но просто скажу - такой опции нет в Spark.Вы, конечно, можете включить их в свое заявление.