У меня есть DataFrame source
и я хочу отфильтровать записи на основе условий в другом DataFrame с именем blacklist
.source
должен соответствовать хотя бы одной записи blacklist
для фильтрации.Условия / записи столбцов в blacklist
связаны AND
.Значение NULL
в blacklist
остается подстановочным знаком, это означает, что соответствующий атрибут может иметь любое значение, соответствующее условию.
Упрощенный пример:
source
:
| id | age | color |
|----|-----|-------|
| 1 | 28 | blue |
| 2 | 25 | blue |
| 3 | 15 | red |
| 4 | 20 | red |
| 5 | 27 | green |
| 6 | 30 | green |
blacklist
:
| age | color |
|------|-------|
| 25 | blue |
| NULL | red |
| 30 | NULL |
output
:
| id | age | color |
|----|-----|-------|
| 1 | 28 | blue |
| 5 | 27 | green |
Соответствующие кадры данных:
val source = Seq((1, 28, "blue"), (2, 25, "blue"), (3, 15, "red"), (4, 20, "red"), (5, 27, "green"), (6, 30, "green")).toDF("id", "age", "color")
val blacklist = Seq((Some(25), Some("blue")), (None, Some("red")), (Some(30), None)).toDF("age", "color")
Дополнительная информация о реальных данных:
- Данные хранятся в таблицах Hive (Формат ORC)
source
содержит 10 миллиардов записей blacklist
содержит 200 тысяч записей с 5 столбцами
Мой подход (с использованием Spark 2.3):
val joinCondition = (source("age") <=> blacklist("age") || blacklist("age").isNull) && (source("color") <=> blacklist("color") || blacklist("color").isNull)
val dataToRemove = source.join(broadcast(blacklist), joinCondition).select(source("id"), source("age"), source("color"))
val output = source.except(dataToRemove)
Проблема и вопрос:
Приведенный выше фрагмент кода работает.Тем не менее, у меня есть проблемы с производительностью в отношении продолжительности работы с реальными огромными данными.Видите ли вы лучший подход к этой проблеме черного списка?
Я также думал о создании условия большого фильтра в драйвере и просто выполните source.filter(theBigCondition)
.Это имеет то преимущество, что соединение не требуется.Тем не менее, даже с меньшим черным списком у меня возникали проблемы с Оптимизатором Catalyst.
Какие у вас идеи?