Apache Spark, диапазон соединений, перекос данных и производительность - PullRequest
0 голосов
/ 01 апреля 2019

У меня есть следующий предикат соединения Apache Spark SQL:

t1.field1 = t2.field1 and t2.start_date <= t1.event_date and t1.event_date < t2.end_date

данные:

t1 DataFrame have over 50 millions rows
t2 DataFrame have over 2 millions rows

почти все t1.field1 поля в t1 DataFrame имеют одинаковое значение (null).

В данный момент кластер Spark более 10 минут висит на одной задаче, чтобы выполнить это соединение, и из-за перекоса данных. В данный момент работает только один работник и одно задание для этого работника. Все остальные 9 рабочих бездействуют. Как улучшить это объединение, чтобы распределить нагрузку от этой конкретной задачи на весь кластер Spark?

Ответы [ 2 ]

2 голосов
/ 01 апреля 2019

Я предполагаю, что вы выполняете внутреннее соединение.

Ниже приведены шаги для оптимизации соединения - 1. Перед объединением мы можем отфильтровать t1 и t2 на основе наименьшего или наибольшего значения start_date, event_date, end_date.Это уменьшит количество рядов.

Проверьте, имеет ли набор данных t2 нулевое значение для field1, если не перед объединением, набор данных t1 может быть отфильтрован на основе условия notNull.Это уменьшит размер t1

Если ваша работа получает только несколько исполнителей, чем имеющийся, то у вас меньше разделов.Просто перераспределите набор данных, установите оптимальное число, чтобы не допустить большого количества разделов или наоборот.

Вы можете проверить правильность разбиения (без асимметрии), посмотрев время выполнения задач, оно должно быть похожим.

Проверить, еслименьший набор данных может быть помещен в память исполнителей, можно использовать broadcast_join.

Возможно, вы захотите прочитать - https://github.com/vaquarkhan/Apache-Kafka-poc-and-notes/wiki/Apache-Spark-Join-guidelines-and-Performance-tuning

1 голос
/ 02 апреля 2019

Я полагаю, что spark уже установил ненулевой фильтр на t1.field1, вы можете проверить это в плане объяснения.

Я бы предпочел поэкспериментировать с созданием дополнительного атрибута, который можно использовать как уравнение.-соединение состояния, например, ведро.Например, вы можете создать атрибут month.Для этого вам нужно будет перечислить months в t2, это обычно делается с использованием UDF.См. Этот SO-вопрос для примера: Как улучшить скорость соединения при соединении с условием между в Spark

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...