Как подсказал @Tzach, лучше избегать использования bar.repartition(1)
и bar.cache()
, так как набор данных bar кажется слишком большим, чтобы поместиться в памяти. Вы можете использовать кеш для небольшого набора данных или даже лучше, чтобы попытаться передать его каждому исполнителю. Также, если вы знаете размер большого набора данных, вы можете рассчитать номер раздела с помощью partition_num = total_size / 500MB
, 250-500 МБ - это идеальный размер каждого раздела, поэтому, если ваши данные имеют размер 10 ГБ, это должно быть 10 ГБ / 500 МБ = 20 разделов.
Вот ваш код после упомянутых изменений:
foo.cache() //feel free to cache the small dataset
bar.repartition(partitions_num) //this is optional
foo.select("col_1").except(bar.select("col_1"))
Также вы можете попробовать использовать left_anti вместо этого, как показано ниже, и сравнить их производительность:
foo.join(bar, foo("col_1") === bar("col_1"), "left_anti").show
Это исключит все записи из foo, для которых col_1 существует в баре.
Если ваши требования требовали иного, исключая записи из bar, которые существуют в foo, тогда ваша программа могла бы быть еще более эффективной, передавая небольшой набор данных foo, как в следующем фрагменте кода:
import org.apache.spark.sql.functions.broadcast
bar.join(broadcast(foo), bar("col1") === foo("col1"), "left_anti").show
Удачи!