Убедитесь, что каждый раздел DF имеет одинаковое значение для столбца / набора столбцов - PullRequest
0 голосов
/ 05 января 2019

У меня есть фрейм данных со столбцами "месяц", "год", "день", "метка времени", ".... и другие столбцы", который был прочитан из таблицы паркета, разделенной на "год", "месяц" и "день". Мне нужны данные, разделенные таким образом, чтобы в каждом разделе были данные, соответствующие только одной комбинации «год», «месяц», «день».

У меня есть фрейм данных со столбцами "месяц", "год", "день", "метка времени", ".... и другие столбцы", который был прочитан из таблицы паркета, разделенной на "год", "месяц" и "день". Мне нужны данные, разделенные таким образом, чтобы в каждом разделе были данные, соответствующие только одной комбинации «год», «месяц», «день».

Затем я запускаю sortWithinPartitions на timestamp, а затем последовательно обрабатываю данные в каждом разделе (то есть с помощью mapPartitions). Проблема в repartition для столбцов не гарантирует, что раздел будет иметь строки только с одной комбинацией «месяц», «год» и «день». Чтобы обойти это, я сделал

df.repartition("year", "month", "day", MAX_INT)                                      
  .sortWithinPartitions($"timestamp")
  .rdd                                       
  .mapPartitions(sequential_processing_function)

Трудно легко проверить, работает ли он правильно, как ожидалось.

Вопрос в том, будет ли это работать должным образом, т. Е. Каждый раздел будет содержать только данные для одной комбинации "год", "месяц", "день".

Вот что я попробовал, основываясь на комментариях пользователя @ user6910411

val keyList = (df.select($"year", $"month", $"day")
                 .distinct()
                 .select(concat($"year", lit(" "),
                                $"month", lit(" "),
                                $"day").alias("partition_key"))
                .rdd
                .map(x => x.getString(0))
                .collect())
val keyIndexMap = collection.mutable.Map[String, Long]()
for (i <- keyList.indices) keyIndexMap(keyList(i)) = i
var keyIndexMapBC = sc.broadcast(keyIndexMap)

class ExactPartitioner[V]() extends Partitioner {
  def getPartition(key: Any): Int = {
    return keyIndexMapBC.value(key.asInstanceOf[String]).toInt
  }

  def numPartitions(): Int = {
      return keyIndexMapBC.value.size
  }
}
val df_partitioned =
    spark.createDataFrame(df,                                                        
        .select("year", "month", "day", "timestamp", "other_columns")                                                                                                                  
        .rdd.map(row => (row.getAs[String]("year") + " " +                                                                         
                         row.getAs[String]("month") + " " +                                                                          
                         row.getAs[String]("day"), row))
        .partitionBy(new ExactPartitioner).values,                                                       
        intermediate_data_schema)

С этим df_partitioned.rdd.partitions.size дает мне правильное количество разделов.

Опять же, как я могу проверить, все ли прошло правильно и работает ли он как ожидалось?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...