Мы используем MapR FS со скользящими томами, и необходимо выровнять файлы секционированного выходного паркета с соответствующими томами.
df
.write
.partitionBy("year", "month", "day", "hour")
.parquet("/data/nfs/{year}/{month}/{day}/datastore")
Идея заключалась в том, чтобы разрешать пути во время выполнения с помощью специального обработчика вывода:
class MaprParquetOutputComitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(
MaprParquetOutputComitter.resolvePath(outputPath),
context)
object MaprParquetOutputComitter extends StrictLogging {
def resolvePath(p: Path): Path = {
logger.info(p.toString)
// Retrieve year, month and day from path object
// and replace placeholders with extracted values
new Path(resolvedPath)
}
}
К сожалению, похоже, что outputPath
выглядит следующим образом "/data/nfs/{year}/{month}/{day}/datastore"
, но не как "/data/nfs/{year}/{month}/{day}/datastore/year=2018/month=6/day=25/hour=8"
.
Есть ли способ получить выходной путь, в котором есть часть разбиения сэтот подход (переопределение ParquetOutputCommitter
)?Или, может быть, есть другое решение для достижения этой цели?