Как отфильтровать DataFrame по условиям в другом DataFrame в Spark - PullRequest
0 голосов
/ 05 декабря 2018

У меня есть DataFrame source и я хочу отфильтровать записи на основе условий в другом DataFrame с именем blacklist.source должен соответствовать хотя бы одной записи blacklist для фильтрации.Условия / записи столбцов в blacklist связаны AND.Значение NULL в blacklist остается подстановочным знаком, это означает, что соответствующий атрибут может иметь любое значение, соответствующее условию.

Упрощенный пример:

source:

| id | age | color |
|----|-----|-------|
| 1  | 28  | blue  |
| 2  | 25  | blue  |
| 3  | 15  | red   |
| 4  | 20  | red   |
| 5  | 27  | green |
| 6  | 30  | green |

blacklist:

| age  | color |
|------|-------|
| 25   | blue  |
| NULL | red   |
| 30   | NULL  |

output:

| id | age | color |
|----|-----|-------|
| 1  | 28  | blue  |
| 5  | 27  | green |

Соответствующие кадры данных:

val source = Seq((1, 28, "blue"), (2, 25, "blue"), (3, 15, "red"), (4, 20, "red"), (5, 27, "green"), (6, 30, "green")).toDF("id", "age", "color")
val blacklist = Seq((Some(25), Some("blue")), (None, Some("red")), (Some(30), None)).toDF("age", "color")

Дополнительная информация о реальных данных:

  • Данные хранятся в таблицах Hive (Формат ORC)
  • source содержит 10 миллиардов записей
  • blacklist содержит 200 тысяч записей с 5 столбцами

Мой подход (с использованием Spark 2.3):

val joinCondition = (source("age") <=> blacklist("age") || blacklist("age").isNull) && (source("color") <=> blacklist("color") || blacklist("color").isNull)
val dataToRemove = source.join(broadcast(blacklist), joinCondition).select(source("id"), source("age"), source("color"))                                                                                    
val output = source.except(dataToRemove)

Проблема и вопрос:

Приведенный выше фрагмент кода работает.Тем не менее, у меня есть проблемы с производительностью в отношении продолжительности работы с реальными огромными данными.Видите ли вы лучший подход к этой проблеме черного списка?

Я также думал о создании условия большого фильтра в драйвере и просто выполните source.filter(theBigCondition).Это имеет то преимущество, что соединение не требуется.Тем не менее, даже с меньшим черным списком у меня возникали проблемы с Оптимизатором Catalyst.

Какие у вас идеи?

Ответы [ 2 ]

0 голосов
/ 18 декабря 2018

Позвольте мне ответить на мой собственный вопрос.

В местных тестах я обнаружил, что except довольно дорого.Добавление типа флага к данным source и последующему фильтрованию по этим швам для ускорения.

val blacklistWithFlag = blacklist.withColumn("remove", lit(true))
val markedSource = source.join(broadcast(blacklistWithFlag), joinCondition, "left_outer").select(source("id"), source("age"), source("color"), blacklistWithFlag("remove"))
val output = markedSource.filter(col("remove").isNull).drop("remove")

Этот подход требует только 1 этап вместо 4 этапов с указанным выше.

0 голосов
/ 05 декабря 2018

Ваш подход объединения с трансляцией, вероятно, является лучшим подходом к этой проблеме.

Сначала попытайтесь понять, какая часть занимает так много времени.Вероятно, именно эта часть:

val joinedDf = data.join(broadcast(blacklist))

Так что моим первым подозреваемым будет перекос данных в 10 B кадре данных.А поскольку ваш черный DF настолько мал, в этом случае отлично подойдет «Соленое соединение».

Основа алгоритма:

Соленое соединение выполняется путем выбора числа 1-N.Чем вы умножаете каждую строку в меньшем DF на N. Для N=3:

черный список до:

| age  | color |
|------|-------|
| 25   | blue  |
| 30   | NULL  |

черный список после:

| salt | age  | color |
|------|------|-------|
|   1  | 25   | blue  |
|   2  | 25   | blue  |
|   3  | 25   | blue  |
|   1  | 25   | red   |
|   2  | 25   | red   |
|   3  | 25   | red   |

И дляЧем больше DF для каждой строки вы добавляете случайное число в диапазоне от 1 до N:

    | salt | age  | color |
    |------|------|-------|
    |   3  | 25   | blue  |
    |   2  | 27   | green |
    |   1  | 25   | blue  |
    |   3  | 45   | red   |

Чем вы добавляете столбец соли, чтобы стать частью объединения:

saltedData.join(brodcast(saltedBlacklist), Seq("salt","age","color"))

Теперь мы можем видетьчто в большом DF у нас есть дубликаты (25, синие), но поскольку они имеют различную соль, они будут распределены на большее количество машин.

Идея соленого соединения состоит в том, чтобы получить большую энтропию.Если в наших столбцах объединения будут действительно искаженные данные, распределение между работниками будет плохим.Добавляя соление, мы можем раздувать данные о малых df, умноженных на N, но мы получаем лучшее распределение, получая лучшую энтропию в наших новых столбцах соединения, которые теперь содержат столбец «соль».

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...