У меня есть фрейм данных со столбцами "месяц", "год", "день", "метка времени", ".... и другие столбцы", который был прочитан из таблицы паркета, разделенной на "год", "месяц" и "день".
Мне нужны данные, разделенные таким образом, чтобы в каждом разделе были данные, соответствующие только одной комбинации «год», «месяц», «день».
У меня есть фрейм данных со столбцами "месяц", "год", "день", "метка времени", ".... и другие столбцы", который был прочитан из таблицы паркета, разделенной на "год", "месяц" и "день".
Мне нужны данные, разделенные таким образом, чтобы в каждом разделе были данные, соответствующие только одной комбинации «год», «месяц», «день».
Затем я запускаю sortWithinPartitions
на timestamp
, а затем последовательно обрабатываю данные в каждом разделе (то есть с помощью mapPartitions). Проблема в repartition
для столбцов не гарантирует, что раздел будет иметь строки только с одной комбинацией «месяц», «год» и «день».
Чтобы обойти это, я сделал
df.repartition("year", "month", "day", MAX_INT)
.sortWithinPartitions($"timestamp")
.rdd
.mapPartitions(sequential_processing_function)
Трудно легко проверить, работает ли он правильно, как ожидалось.
Вопрос в том, будет ли это работать должным образом, т. Е. Каждый раздел будет содержать только данные для одной комбинации "год", "месяц", "день".
Вот что я попробовал, основываясь на комментариях пользователя @ user6910411
val keyList = (df.select($"year", $"month", $"day")
.distinct()
.select(concat($"year", lit(" "),
$"month", lit(" "),
$"day").alias("partition_key"))
.rdd
.map(x => x.getString(0))
.collect())
val keyIndexMap = collection.mutable.Map[String, Long]()
for (i <- keyList.indices) keyIndexMap(keyList(i)) = i
var keyIndexMapBC = sc.broadcast(keyIndexMap)
class ExactPartitioner[V]() extends Partitioner {
def getPartition(key: Any): Int = {
return keyIndexMapBC.value(key.asInstanceOf[String]).toInt
}
def numPartitions(): Int = {
return keyIndexMapBC.value.size
}
}
val df_partitioned =
spark.createDataFrame(df,
.select("year", "month", "day", "timestamp", "other_columns")
.rdd.map(row => (row.getAs[String]("year") + " " +
row.getAs[String]("month") + " " +
row.getAs[String]("day"), row))
.partitionBy(new ExactPartitioner).values,
intermediate_data_schema)
С этим df_partitioned.rdd.partitions.size
дает мне правильное количество разделов.
Опять же, как я могу проверить, все ли прошло правильно и работает ли он как ожидалось?