Я пытаюсь сохранить паркетный фрейм данных Spark с разделением во временный каталог для модульных тестов, однако по какой-то причине разделы не создаются. Сами данные сохраняются в каталоге и могут использоваться для тестов. Вот метод, который я создал для этого:
def saveParquet(df: DataFrame, partitions: String*): String = {
val path = createTempDir()
df.repartition(1).parquet(path)(partitions: _*)
path
}
val feedPath: String = saveParquet(feedDF.select(feed.schema), "processing_time")
Этот метод работает для различных фреймов данных с различными схемами, но по какой-то причине не генерирует разделы для этого. Я вышел из результирующего пути, и он выглядит так:
/var/folders/xg/fur_diuhg83b2ba15ih2rt822000dhst/T/testutils-samples8512758291/jf81n7bsj-95hs-573n-b73h-7531ug04515
Но он должен выглядеть так:
/var/folders/xg/fur_diuhg83b2ba15ih2rt822000dhst/T/testutils-samples8512758291/jf81n7bsj-95hs-573n-b73h-7531ug04515/processing_time=1591714800000/part-some-random-numbersnappy.parquet
Я проверил, что данные и все столбцы прочитаны просто отлично перед разделением, эта проблема возникает, как только создается вызов раздела. Кроме того, я запустил регулярное выражение в каталогах, которые не прошли с ошибкой сопоставления на тестовых образцах - s".*processing_time=([0-9]+)/.*parquet".r
Так в чем может быть причина этой проблемы? Как еще я могу разделить фрейм данных?
Схема фрейма данных выглядит так:
val schema: StructType = StructType(
Seq(
StructField("field1", StringType),
StructField("field2", LongType),
StructField("field3", StringType),
StructField("field4Id", IntegerType, nullable = true),
StructField("field4", FloatType, nullable = true),
StructField("field5Id", IntegerType, nullable = true),
StructField("field5", FloatType, nullable = true),
StructField("field6Id", IntegerType, nullable = true),
StructField("field6", FloatType, nullable = true),
//partition keys
StructField("processing_time", LongType)
)
)