Оптимизация объединения в случае несбалансированных наборов данных - PullRequest
0 голосов
/ 23 ноября 2018

У меня есть два комплекта, к которым можно присоединиться ВЛЕВО:

Набор данных A: ~ 10000 файлов паркета каждые 300 КБ

Набор данных B: ~ 50000 файлов паркета каждый 30 МБ

Я хочу присоединиться к строковому столбцу, который является общим в обоих наборах данных, скажем «имя».

Одна важная вещь - каждая строка в наборе данных A имеет соответствие в наборе данных B. Но набор данных B содержит много других строк.

Обычная функция соединения занимает очень много времени и в большинстве случаев дает сбой.Поэтому я спрашиваю, может ли быть оптимизация?Например, является ли хорошей идеей разбиение набора данных B в алфавитном порядке по столбцу «имя»?Соединение с широковещательной рассылкой не будет работать, поскольку набор данных A недостаточно мал.

1 Ответ

0 голосов
/ 23 ноября 2018

Если вы можете вставить в файл ваши файлы перед присоединением, это, вероятно, лучше.В противном случае вам потребуется еще один шаг записи для использования группирования.

df_A.write.format('parquet')
...     .bucketBy(10, 'name')
...     .mode("overwrite")
...     .saveAsTable('bucketed_table_A'))

df_B.write.format('parquet')
...     .bucketBy(10, 'name')
...     .mode("overwrite")
...     .saveAsTable('bucketed_table_B'))

Bucketing позволяет предварительно перемешать данные.И dataframa_A, и datafram_B должны иметь одинаковое количество сегментов.Выбор номера ведра является сложным «искусством» и зависит от ваших данных и вашей конфигурации.

Затем вы читаете свои данные в пакетах и ​​присоединяете их к «имени».

spark.table('bucketed_table_A').join(
    spark.table('bucketed_table_B'),
    on='name',
    how='left'
)

Делая это, вы переводите время вычислений из шага соединения в шаг записи / вставки.Но сделайте это один раз, и тогда вы сможете использовать его много раз.

...