Идентификация записей, присутствующих в одном столбце данных, но не присутствующих в другом столбце данных - PullRequest
1 голос
/ 01 мая 2019

Я пытаюсь выбрать все элементы, присутствующие в foo, но не присутствующие в баре.Я использую этот код:

foo.repartition(1)
foo.cache()
bar.repartition(1)
bar.cache()
foo.select("col_1").except(bar.select("col_1"))

Есть ли лучший или более быстрый способ сделать это?В настоящее время это занимает более 15 минут при работе в кластере.

Дополнительная информация: foo будет содержать около 100-1000 элементов.бар будет иметь 40 миллионов + элементов.foo - это фрейм данных, состоящий из данных, считанных из таблицы кустов (50 столбцов) с использованием Spark SQL.bar - это фрейм данных, состоящий из данных, считанных из таблицы куду (250 столбцов) с использованием KuduContext.

Использование Spark 2.2 на CDH 5.15.x с Scala 2.11.8.

1 Ответ

2 голосов
/ 02 мая 2019

Как подсказал @Tzach, лучше избегать использования bar.repartition(1) и bar.cache(), так как набор данных bar кажется слишком большим, чтобы поместиться в памяти. Вы можете использовать кеш для небольшого набора данных или даже лучше, чтобы попытаться передать его каждому исполнителю. Также, если вы знаете размер большого набора данных, вы можете рассчитать номер раздела с помощью partition_num = total_size / 500MB, 250-500 МБ - это идеальный размер каждого раздела, поэтому, если ваши данные имеют размер 10 ГБ, это должно быть 10 ГБ / 500 МБ = 20 разделов.

Вот ваш код после упомянутых изменений:

foo.cache() //feel free to cache the small dataset
bar.repartition(partitions_num) //this is optional

foo.select("col_1").except(bar.select("col_1"))

Также вы можете попробовать использовать left_anti вместо этого, как показано ниже, и сравнить их производительность:

foo.join(bar, foo("col_1") === bar("col_1"), "left_anti").show

Это исключит все записи из foo, для которых col_1 существует в баре.

Если ваши требования требовали иного, исключая записи из bar, которые существуют в foo, тогда ваша программа могла бы быть еще более эффективной, передавая небольшой набор данных foo, как в следующем фрагменте кода:

import org.apache.spark.sql.functions.broadcast

bar.join(broadcast(foo), bar("col1") === foo("col1"), "left_anti").show

Удачи!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...