Spark DataFrame - Как разбить данные на основе условия - PullRequest
0 голосов
/ 17 октября 2018

Имейте некоторый набор данных сотрудника.в том, что мне нужно разделить зарплату сотрудника на основе некоторых условий.Создан DataFrame и преобразован в Пользовательский объект DataFrame.Создан пользовательский раздел по зарплате.

class SalaryPartition(override val numPartitions: Int) extends Partitioner {

  override def getPartition(key: Any): Int =
    {
      import com.csc.emp.spark.tutorial.PartitonObj._
      key.asInstanceOf[Emp].EMPLOYEE_ID match {
        case salary if salary < 10000 => 1
        case salary if salary >= 10001 && salary < 20000 => 2
        case _ => 3
      }

    }

}

Вопрос, как я могу вызвать \ вызвать мой обычный раздел.Не удалось найти разделBy в кадре данных.Есть какой-нибудь альтернативный способ?

1 Ответ

0 голосов
/ 18 октября 2018

Просто код для моего комментария:

val empDS = List(Emp(5, 1000), Emp(4, 15000), Emp(3, 30000), Emp(2, 2000)).toDS()
println(s"Original partitions number: ${empDS.rdd.partitions.size}")
println("-- Original partition: data --")
empDS.rdd.mapPartitionsWithIndex((index, it) => {
  it.foreach(r => println(s"Partition $index: $r")); it
}).count()

val getSalaryGrade = (salary: Int) => salary match {
  case salary if salary < 10000 => 1
  case salary if salary >= 10001 && salary < 20000 => 2
  case _ => 3
}
val getSalaryGradeUDF = udf(getSalaryGrade)
val salaryGraded = empDS.withColumn("salaryGrade", getSalaryGradeUDF($"salary"))

val repartitioned = salaryGraded.repartition($"salaryGrade")
println
println(s"Partitions number after: ${repartitioned.rdd.partitions.size}")
println("-- Reparitioned partition: data --")

repartitioned.as[Emp].rdd.mapPartitionsWithIndex((index, it) => {
  it.foreach(r => println(s"Partition $index: $r")); it
}).count()

Вывод:

Original partitions number: 2
-- Original partition: data --
Partition 1: Emp(3,30000)
Partition 0: Emp(5,1000)
Partition 1: Emp(2,2000)
Partition 0: Emp(4,15000)

Partitions number after: 5
-- Reparitioned partition: data --
Partition 1: Emp(3,30000)
Partition 3: Emp(5,1000)
Partition 3: Emp(2,2000)
Partition 4: Emp(4,15000)

Примечание: угадать, возможно несколько разделов с одним и тем же "salaryGrade".

Совет:"groupBy" или аналогичный выглядит как более надежное решение.

Для использования с сущностями набора данных можно использовать "groupByKey":

empDS.groupByKey(x => getSalaryGrade(x.salary)).mapGroups((index, it) => {
  it.foreach(r => println(s"Group $index: $r")); index
}).count()

Вывод:

Group 1: Emp(5,1000)
Group 3: Emp(3,30000)
Group 1: Emp(2,2000)
Group 2: Emp(4,15000)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...