Лучшая практика для написания в hadoop от spark - PullRequest
0 голосов
/ 24 апреля 2018

Я просматривал некоторый код, написанный коллегой, и нашел метод, подобный этому:

def writeFile(df: DataFrame,
              partitionCols: List[String],
              writePath: String): Unit {

    val df2 = df.repartition(partitionCols.get.map(col): _*)
    val dfWriter = df2.write.partitionBy(partitionCols.get.map(col): _*)
    dfWriter
        .format("parquet")
        .mode(SaveMode.Overwrite)
        .option("compression", "snappy")
        .save(writePath)

}

Как правило, рекомендуется вызывать repartition для предварительно определенного набора столбцов, подобных этому, а затем вызывать partitionBy, а затем сохранять на диск?

Ответы [ 3 ]

0 голосов
/ 24 апреля 2018

Чтобы помочь вам понять различия между partitionBy() и repartition(), перераспределение на фрейме данных использует разделитель на основе хэша, который принимает COL, а также NumOfPartitions, на основе которого генерируется хеш-значение и группируются данные.

По умолчанию repartition() создает 200 разделов. Из-за возможности коллизий существует хорошая вероятность разделения нескольких записей с разными ключами на одни и те же сегменты.

С другой стороны, partitionBy() принимает COL, при котором разделы основаны исключительно на уникальных ключах. Разделы пропорциональны количеству уникальных ключей в данных:

В случае перераспределения есть хороший шанс записи пустых файлов. Но в случае partitionBy пустых файлов не будет.

0 голосов
/ 24 апреля 2018

Является ли ваша работа связанной с процессором, памятью, сетевым вводом-выводом или дисковым вводом-выводом?

Первые 2 случая значимы, если df2 достаточно велик, и другие ответы правильно решают эти случаи.

Если ваша работа связана с дисковым вводом-выводом (и в будущем вы часто пишете большие файлы в HDFS), многие облачные провайдеры позволят вам выбрать более быстрый SSD-диск за дополнительную плату.

Также Сэнди Риза рекомендует хранить --executor-cores ниже 5:

Я заметил, что у клиента HDFS есть проблемы с множеством параллельных потоков.Грубое предположение состоит в том, что максимум пять задач для каждого исполнителя могут обеспечить полную пропускную способность записи, поэтому лучше держать число ядер на исполнителя ниже этого числа.

0 голосов
/ 24 апреля 2018

Как правило, вы вызываете repartition с теми же столбцами, что и partitionBy, чтобы иметь один файл паркета в каждом разделе. Это достигается здесь. Теперь вы можете утверждать, что это может означать, что размер файла паркета становится больше или хуже может вызвать переполнение памяти.

Эта проблема обычно решается добавлением row_number в Dataframe, а затем указанием количества документов, которое может иметь каждый файл паркета. Что-то вроде

val repartitionExpression =colNames.map(col) :+ floor(col(RowNumber) / docsPerPartition)
// now use this to repartition 

Чтобы ответить на следующую часть как persist after partitionBy, которая здесь не нужна, так как после раздела она напрямую записывается на диск.

...