как работает "обмен хэш-секционированием" в spark - PullRequest
0 голосов
/ 16 января 2019

У меня есть набор данных, который я хочу записать, отсортированный в файлы паркета, чтобы впоследствии получить выгоду от запроса этих файлов через Spark, включая Predicate Pushdown.

В настоящее время я использовал перераспределение по столбцам и количество разделов для перемещения данных в конкретный раздел. Столбец идентифицирует соответствующий раздел (от 0 до (фиксированный) n). В результате scala / spark генерирует неожиданный результат и создает меньше разделов (некоторые из них пусты). Может быть, хэш-коллизия?

Для решения проблемы я попытался выяснить причину и попытался найти обходные пути. Я нашел один обходной путь, преобразовав фрейм данных в rdd и используя partitionBy с HashPartitioner. Удивительно для меня: я получил ожидаемые результаты. Но преобразование кадра данных в RDD не является для меня решением, поскольку требует слишком много ресурсов.

Я протестировал эту среду на

  • SPARK 2.0 на cloudera CDH 5.9.3

  • SPARK 2.3.1 на emr-5.17.0

Вот мои тесты с выводами. Пожалуйста, используйте Spark-shell, чтобы запустить их

    scala> import org.apache.spark.HashPartitioner
    import org.apache.spark.HashPartitioner

    scala> val mydataindex = Array(0,1, 2, 3,4)
    mydataindex: Array[Int] = Array(0, 1, 2, 3, 4)

    scala> val mydata = sc.parallelize(for {
         |  x <- mydataindex
         |  y <- Array(123,456,789)
         | } yield (x, y), 100)
    mydata: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:27

    scala> val rddMyData = mydata.partitionBy(new HashPartitioner(5))
    rddMyData: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:26

    scala> val rddMyDataPartitions =   rddMyData.mapPartitionsWithIndex{
         |                 (index, iterator) => {
         |                    val myList = iterator.toList
         |                    myList.map(x => x + " -> " + index).iterator
         |                 }
         |              }
    rddMyDataPartitions: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:26

    scala>
         | // this is expected:

    scala> rddMyDataPartitions.take(100)
    res1: Array[String] = Array((0,123) -> 0, (0,456) -> 0, (0,789) -> 0, (1,123) -> 1, (1,456) -> 1, (1,789) -> 1, (2,123) -> 2, (2,456) -> 2, (2,789) -> 2, (3,456) -> 3, (3,789) -> 3, (3,123) -> 3, (4,789) -> 4, (4,123) -> 4, (4,456) -> 4)

    scala> val dfMyData = mydata.toDF()
    dfMyData: org.apache.spark.sql.DataFrame = [_1: int, _2: int]

    scala> val dfMyDataRepartitioned = dfMyData.repartition(5,col("_1"))
    dfMyDataRepartitioned: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_1: int, _2: int]

    scala> dfMyDataRepartitioned.explain(false)
    == Physical Plan ==
    Exchange hashpartitioning(_1#3, 5)
    +- *(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#3, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#4]
       +- Scan ExternalRDDScan[obj#2]

    scala> val dfMyDataRepartitionedPartition  = dfMyDataRepartitioned.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").count()
    dfMyDataRepartitionedPartition: org.apache.spark.sql.DataFrame = [partition_id: int, count: bigint]

    scala> // this is unexpected, because 1 partition has more indexes

    scala> dfMyDataRepartitionedPartition.show()
    +------------+-----+
    |partition_id|count|
    +------------+-----+
    |           1|    6|
    |           3|    3|
    |           4|    3|
    |           2|    3|
    +------------+-----+

Сначала я подумал, что HashPartitioner используется в методе перераспределения фрейма данных, но, похоже, это не так, потому что он работает с RDD.

Может кто-нибудь подсказать мне, как работает этот "хэш-разделение Exchange" (см. Вывод объяснения выше)?

2019-01-16 12:20: Это не дубликат Как работает HashPartitioner? , потому что меня интересует алгоритм хеширования перераспределения по столбцам (+ число разделов) в столбце целых чисел , Обычный HashPartitioner работает как положено, как вы можете видеть в исходном коде.

1 Ответ

0 голосов
/ 16 января 2019

Здесь нет ничего неожиданного. Как объяснено в Как работает HashPartitioner? Spark использует хэш (ключ) по модулю количества разделов, и неравномерное распределение, особенно для небольших наборов данных, не является неожиданным.

Ожидается также разница между Dataset и RDD, так как обе используют разные функции хеширования (то же самое).

Наконец

В результате scala / spark генерирует неожиданный результат и создает меньше разделов

не является правильным наблюдением. Количество созданных разделов точно соответствует запрошенному

scala> dfMyDataRepartitioned.rdd.getNumPartitions
res8: Int = 5

но пустые не будут видны в агрегации, потому что нет соответствующих значений.

...