Spark executor не хватает памяти при соединении - PullRequest
0 голосов
/ 25 июня 2019

Привет! Я использую spark Mllib и выполняю приближение сходства между набором данных 1M и набором данных 1k.
Когда я делаю это, я транслирую 1k.
То, что я вижу, это то, что работа перестает идти вперед на втором-последнее задание.
Все исполнители мертвы, но тот, который продолжает работать очень долго, пока не достигнет нехватки памяти.
Я проверил ганглии, и он показывает, что память продолжает расти, пока не достигнет предела enter image description here

и дисковое пространство продолжает сокращаться до его завершения:
enter image description here
Действие, которое я вызвал, - запись, но оно делает то же самое сcount.
Теперь мне интересно: возможно ли, что все разделы в кластере сходятся только к одному узлу и создают это узкое место?

Вот мой фрагмент кода:

 var dfW = cookesWb.withColumn("n", monotonically_increasing_id())
var bunchDf = dfW.filter(col("n").geq(0) && col("n").lt(1000000) )
bunchDf.repartition(3000)
model.
approxSimilarityJoin(bunchDf,broadcast(cookesNextLimited),80,"EuclideanDistance").
withColumn("min_distance", min(col("EuclideanDistance")).over(Window.partitionBy(col("datasetA.uid")))
              ).
    filter(col("EuclideanDistance") === col("min_distance")).
  select(col("datasetA.uid").alias("weboId"),
    col("datasetB.nextploraId").alias("nextId"),
    col("EuclideanDistance")).write.format("parquet").mode("overwrite").save("approxJoin.parquet")

1 Ответ

1 голос
/ 25 июня 2019

Я постараюсь ответить как можно лучше. В Spark есть вещи, которые называются случайными операциями, и они делают именно то, что вы думали, после некоторых вычислений они переносят всю информацию в один узел. Если вы подумаете об этом, у этих операций не будет другого способа, если в итоге все данные не будут помещены в один узел.

пример для операции соединения: вам нужно разделить на 2 разных узла

partition 1:
s, 1
partition 2:
s, k

и вы хотите присоединиться к с. Если вы не получите обе строки на одном компьютере, будет невозможно рассчитать, что они должны быть объединены.

То же самое с подсчетом, уменьшением и многими другими операциями. Вы можете прочитать об операциях случайного воспроизведения или спросить меня, хотите ли вы получить дополнительные разъяснения.

Возможное решение для вас: вместо сохранения данных в памяти вы можете использовать что-то вроде:

dfW.persist(StorageLevel.MEMORY_AND_DISK_SER)

Существуют и другие варианты для сохранения, но в основном он сохраняет разделы и данные не только в памяти, но и на диске, а также в последовательном порядке для экономии места.

...