Трудно сказать, не видя ваших запросов и не понимая ваш набор данных. Я полагаю, вы не включили его или потому, что он очень сложный или чувствительный?Так что это немного выстрел в темноте, однако это выглядит как проблема, с которой мы столкнулись в моей команде на работе.Мое грубое предположение о том, что происходит, состоит в том, что во время одного из ваших соединений у вас есть ключевое пространство, которое имеет большую мощность, но очень неравномерное распределение.В нашем случае мы подключались к источникам веб-трафика, и хотя у нас есть тысячи возможных источников трафика, подавляющее большинство трафика исходит от нескольких.Это вызвало проблему, когда мы присоединились.Ключи будут равномерно распределены между исполнителями, однако, поскольку, возможно, 95% данных совместно используются, может быть, 3 или 4 ключа, очень небольшое количество исполнителей выполняли большую часть работы.Когда вы находите объединение, которое страдает от этого, нужно выбрать меньший из двух наборов данных и явно выполнить широковещательное соединение.(Обычно Spark будет пытаться это сделать, но не всегда точно сказать, когда это нужно.)
Для этого, скажем, у вас есть два DataFrames.В одном из них есть два столбца: number
и stringRep
, где число - всего одна строка для всех целых чисел из 0-10000
, а stringRep
- просто строковое представление этого значения, поэтому «один», «два», «три»"и т. д. Мы назовем это numToString
У другого DataFrame есть некоторый ключевой столбец для соединения с number
в numToString
, называемый kind
, некоторыми другими не относящимися к делу данными и 100 000 000 строк.Мы назовем этот DataFrame ourData
.Тогда давайте предположим, что распределение 100 000 000 строк в ourData
составляет 90% с kind == 1
, 5% с kind == 2
, а оставшиеся 5% довольно равномерно распределены между оставшимися 99,998 числами.Когда вы выполняете следующий код:
val numToString: DataFrame = loadNumToString()
val ourData: DataFrame = loadOurCode()
val joined = ourData.join(numToString).where(ourData("kind") === numToString("number"))
... очень вероятно, что Spark отправит% 90 данных (которые имеют kind == 1
) одному исполнителю,% 5 данных (то, что имеет kind == 2
) другому исполнителю, а оставшиеся% 5 размазаны по остальным, оставляя двух исполнителей с огромными разделами, а остальные с очень маленькими.
Способ, как я упоминал ранее,явно выполнить широковещательное соединение.Для этого нужно взять один DataFrame и полностью распределить его по каждому узлу.Таким образом, вы должны сделать это вместо этого:
val joined = ourData.join(broadcast(numToString)).where(ourData("kind") === numToString("number"))
... что бы отправить numToString
каждому исполнителю.Предполагая, что ourData
был предварительно равномерно разделен, данные должны оставаться равномерно распределенными по исполнителям.Это может быть не вашей проблемой, но это действительно похоже на проблему, с которой мы столкнулись.Надеюсь, это поможет!
Более подробную информацию о трансляциях можно найти здесь: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html