Как разделить большой файл паркета на несколько паркетов и сохранить их в разных путях по столбцу времени - PullRequest
0 голосов
/ 03 июля 2019

Мой файл sparquet вот так

идентификатор, имя, дата

1, а, 1980-09-08

2, б, 1980-09-08

3, с, 2017-09-09

Надеюсь, что выходной файл вот так

enter image description here

папка 19800908 содержит данные

идентификатор, имя, дата

1, а, 1980-09-08

2, б, 1980-09-08

и папка 20170909 содержит данные

идентификатор, имя, дата

3, с, 2017-09-09

Я знаю, что может сгруппировать ключ date, но не знаю, как вывести несколько паркетных файлов, используйте такой класс MultipleTextOutputFormat

Я не хочу зацикливать циклы клавиш, которые замедляются и требуют много памяти

теперь код такой

   val input = sqlContext.read.parquet(sourcePath)
      .persist(StorageLevel.DISK_ONLY)

    val keyRows: RDD[(Long, Row)] =
      input.mapPartitions { partition =>
        partition.flatMap { row =>
          val key =  format.format(row.getDate(3)).toLong
          Option((key, row))
        }
      }.persist(StorageLevel.DISK_ONLY)

    val keys = keyRows.keys.distinct().collect()

    for (key <- keys) {
      val rows = keyRows.filter { case (_key, _) => _key == key }.map(_._2)
      val df = sqlContext.createDataFrame(rows, input.schema)
      val path = s"${outputPrefix}/$key"
      HDFSUtils.deleteIfExist(path)
      df.write.parquet(path)
    }

Если я использую MultipleTextOutputFormat, вывод будет следующим, чего я не хочу

enter image description here

    keyRows.groupByKey()
      .saveAsHadoopFile(conf.getOutputPrefixDirectory, classOf[String], classOf[String],
        classOf[SimpleMultipleTextOutputFormat[_, _]])
public class SimpleMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {

    @Override
    protected String generateFileNameForKeyValue(A key, B value, String name) {
//        return super.generateFileNameForKeyValue(key, value, name);
        return key.toString();
    }
}

Ответы [ 2 ]

1 голос
/ 03 июля 2019

Можно использовать запись с разделенным столбцом:

df.write.partitionBy("dateString").parquet("/path/to/file").

Разница - имя папки будет выглядеть как «dateString = 2017-09-09», и перед сохранением необходимо создать новый строковый столбец «dateString».

0 голосов
/ 03 июля 2019

из этого поста запись данных спарк-раздела по отметке времени

    input
      .withColumn("_key", date_format(col(partitionField), format.toPattern))
      .write
      .partitionBy("_key")
      .parquet(conf.getOutputPrefixDirectory)

enter image description here

Но как удалить имя папки '_ke = '

...