Фильтрация пустых разделов в RDD - PullRequest
1 голос
/ 29 мая 2020

Есть ли способ отфильтровать пустые разделы в RDD? У меня остались пустые разделы после разбиения на разделы, и я не могу использовать их в методе действий.

Я использую Apache Spark в Scala

Ответы [ 3 ]

1 голос
/ 30 мая 2020

Это мой пример данных

  val sc = spark.sparkContext
  val myDataFrame = spark.range(20).toDF("mycol").repartition($"mycol")
  myDataFrame.show(false)

Вывод :

+-----+
|mycol|
+-----+
|19   |
|0    |
|7    |
|6    |
|9    |
|17   |
|5    |
|1    |
|10   |
|3    |
|12   |
|8    |
|11   |
|2    |
|4    |
|13   |
|18   |
|14   |
|15   |
|16   |
+-----+

В приведенном выше коде при повторном разбиении столбца тогда будет создано 200 разделов, поскольку spark.sql.shuffle.partitions = 200 в том, что многие из них не используются или пустые разделы, так как данные представляют собой всего лишь 10 чисел (мы пытаемся уместить 20 номеров в 200 разделов, значит .... большинство разделов пустые .. ..: -))

1) Подготовьте длинную аккумуляторную переменную для быстрого подсчета непустых разделов.
2) Добавьте все непустые разделы в аккумуляторную переменную, например ниже пример.

 val nonEmptyPartitions = sc.longAccumulator("nonEmptyPartitions")
 myDataFrame.foreachPartition(partition =>
    if (partition.length > 0) nonEmptyPartitions.add(1))
  • отбрасывать непустые разделы (означает объединять их ... меньше перемешивания / минимальное перемешивание).
  • распечатать их.

val finalDf = myDataFrame.coalesce(nonEmptyPartitions.value.toInt)
println(s"nonEmptyPart : ${nonEmptyPartitions.value.toInt}")
println(s"df.rdd.partitions.length :  ${myDataFrame.rdd.getNumPartitions}")
println(s"finalDf.rdd.partitions.length  :  ${finalDf.rdd.getNumPartitions}")

распечатать их ...

Результат :

nonEmptyPart : 20
df.rdd.partitions.length :  200
finalDf.rdd.partitions.length  :  20

Доказательство того, что все непустые разделы удалены . ..

  myDataFrame.withColumn("partitionId", org.apache.spark.sql.functions.spark_partition_id)
.groupBy("partitionId")
.count
.show

Результат распечатанного количества записей по разделам:

 +-----------+-----+
|partitionId|count|
+-----------+-----+
|128        |1    |
|190        |1    |
|140        |1    |
|164        |1    |
|5          |1    |
|154        |1    |
|112        |1    |
|107        |1    |
|4          |1    |
|49         |1    |
|69         |1    |
|77         |1    |
|45         |1    |
|121        |1    |
|143        |1    |
|58         |1    |
|11         |1    |
|150        |1    |
|68         |1    |
|116        |1    |
+-----------+-----+

Примечание:

* 104 9 *

Использование spark_partition_id предназначено только для демонстрации / отладки, а не для производственных целей.

Я уменьшил 200 разделов (из-за переделки по столбцу) до 20 непустых разделов.

Вывод:

Наконец, вы избавились от лишних пустых разделов, на которых нет данных, и избежали ненужного расписания для фиктивных задач на пустых разделах.

1 голос
/ 29 мая 2020

Исходя из той небольшой информации, которую вы предоставляете, я могу подумать о двух вариантах. Используйте mapPartitions и просто ловите пустые итераторы и возвращая их, работая с непустыми.

rdd.mapPartitions { case iter => if(iter.isEmpty) { iter } else { ??? } }

Или вы можете использовать repartition, чтобы избавиться от пустых разделов.

rdd.repartition(10) // or any proper number
0 голосов
/ 29 мая 2020

Если вы не знаете отдельные значения в столбце и wi sh, чтобы избежать пустых разделов, вы можете использовать countApproxDistinct() как:

df.repartition(df.rdd.countApproxDistinct().toInt)

Если вы wi sh для фильтрации существующие пустые разделы и повторное разбиение, вы можете использовать как решение, предложенное Sasa

ИЛИ:

df.repartition(df.mapPartitions(part => List(part.length).iterator).collect().count(_ != 0)).df.getNumPartitions)

Однако в более позднем случае разделы могут содержать или не содержать записи по значению.

...