Проблема, с которой вы сталкиваетесь, связана с тем, как spark принимает инструкции, которые вы им рассказываете, и преобразует их в реальные вещи, которые он собирается выполнять. Сначала нужно понять ваши инструкции, запустив Analyzer, затем он пытается улучшить их, запустив свой оптимизатор. Эта настройка применима к обоим.
В частности, ваш код взрывается во время шага в Анализаторе. Анализатор отвечает за выяснение того, к чему вы на самом деле обращаетесь, когда вы обращаетесь к вещам. Например, сопоставление имен функций с реализациями или сопоставление имен столбцов по переименованиям и различные преобразования. Он делает это за несколько проходов, разрешая дополнительные вещи за каждый проход, затем снова проверяя, может ли он разрешить перемещение.
Я думаю, что происходит в вашем случае, каждый проход, вероятно, разрешает один столбец, но 100 проходов не выполняется ' Достаточно, чтобы решить все столбцы. Увеличивая его, вы даете ему достаточно пропусков, чтобы иметь возможность полностью выполнить ваш план. Это, безусловно, красный флаг для потенциальной проблемы с производительностью, но если ваш код работает, вы можете просто увеличить значение и не беспокоиться об этом.
Если это не сработает, вам, вероятно, придется попробовать что-то предпринять, чтобы уменьшить количество столбцов, используемых в вашем плане. Возможно объединение всех столбцов в один закодированный строковый столбец в качестве ключа. Вы могли бы извлечь пользу из контрольной точки данных перед выполнением объединения, чтобы вы могли сократить свой план.
РЕДАКТИРОВАТЬ:
Кроме того, я бы реорганизовал ваш код выше, чтобы вы могли сделать все это только с одним соединением. Это должно быть намного быстрее и может решить вашу другую проблему.
Каждое объединение приводит к случайному перемешиванию (передача данных между вычислительными узлами), которое добавляет время к вашей работе. Вместо того, чтобы вычислять, добавлять, удалять и изменять независимо, вы можете сделать их все сразу. Что-то вроде приведенного ниже кода. Это в scala коде psuedo, потому что я знаком с этим больше, чем Java API.
import org.apache.spark.sql.functions._
var oldDf = ..
var newDf = ..
val changeCols = newDf.columns.filter(_ != "id").map(col)
// Make the columns you want to compare into a single struct column for easier comparison
newDf = newDF.select($"id", struct(changeCols:_*) as "compare_new")
oldDf = oldDF.select($"id", struct(changeCols:_*) as "compare_old")
// Outer join on ID
val combined = oldDF.join(newDf, Seq("id"), "outer")
// Figure out status of each based upon presence of old/new
// IF old side is missing, must be an ADD
// IF new side is missing, must be a DELETE
// IF both sides present but different, it's a CHANGE
// ELSE it's NOCHANGE
val status = when($"compare_new".isNull, lit("add")).
when($"compare_old".isNull, lit("delete")).
when($"$compare_new" != $"compare_old", lit("change")).
otherwise(lit("nochange"))
val labeled = combined.select($"id", status)
На данный момент у нас есть каждый идентификатор, помеченный как ADD / DELETE / CHANGE / NOCHANGE, поэтому мы можем просто groupBy / кол. Эту агг можно сделать почти полностью на стороне карты, поэтому она будет намного быстрее, чем объединение.
labeled.groupBy("status").count.show