Spark 2.3 Dataframe partition-хотите разделить данные по ключу на n номеров разделов - PullRequest
0 голосов
/ 13 марта 2020

Мне нужна помощь в разбиении фрейма данных в spark (scala). Мне нужно разделить по ключевому столбцу на n количество разделов, при этом все строки, относящиеся к одному и тому же ключу, должны находиться в одном разделе (т. Е. Ключ не должен распределяться по разделу)

Примечание. Мой ключ может содержать миллионы

Пример: Предположим, у меня есть ниже датафрейм

enter image description here

и т. Д.

Как вы можете видеть много значений акций тот же ключ. Я хочу разбить этот набор данных на «n» количество разделов, где один и тот же ключ должен быть в одном разделе, а ключи не должны распределяться по разделам. Несколько ключей могут быть в одном разделе и ключи не сортируются.

Заранее спасибо

Ответы [ 3 ]

0 голосов
/ 13 марта 2020

Вы можете разбить DataFrames при записи их в файл на основе вывода. Что-то вроде:

df.write.partitionBy("colName").format("parquet").save(path-to-file)

Это создаст структуру каталогов как в вашем

path
└── to
    └── file
         ├── colName=value1
                       └── data.parquet
         ├── colName=value2
                       └── data.parquet

Когда вы загружаете данные и фильтры, предикаты будут переданы в исходный файл, и вы сможете получить преимущества в производительности от разбиения

Разве это не то, что вы ищете?

0 голосов
/ 16 марта 2020

попробуй

def repartition(partitionExprs: org.apache.spark.sql.Column*): org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] 

val df = Seq(("aa","vv"),("aa","v1v1"),("a1","v2")).toDF("Key","Value")
 val partionedDf = df.repartition(col("Key"))

0 голосов
/ 13 марта 2020

Согласно книге Spark - The Definitive Guide Spark имеет два встроенных разделителя: HashPartitioner для дискретных значений и RangePartitioner. Оба работают для дискретных и непрерывных значений.

Пример HashPartitioner:

import org.apache.spark.HashPartitioner

val rdd = df.rdd // convert DataFrame to low-level RDD
val keyedRDD = rdd.keyBy(...) // define your custom key
keyedRDD.partitionBy(new HashPartitioner(n))

Пример разделителя:

import org.apache.spark.Partitioner

class DomainParitioner extends Partitioner {
  def numPartitions = n
  def getPartition(key: Any): Int = {
    // your custome partition logic
  }
}

keyedRDD.partitionBy(new DomainPartitioner).map(_._1).glom().map(_.toSet.toSeq.length)

В книге также упоминается, что вам следует остерегаться key skew, что означает, что некоторые ключи могут иметь много-много больше значений, чем другие ключи. Вы хотите максимально разбить эти ключи, чтобы улучшить параллелизм и предотвратить OutOfMemoryErrors во время выполнения.

...