Запись DataFrame в виде паркета создает пустые файлы - PullRequest
3 голосов
/ 19 июня 2019

Я пытаюсь оптимизировать производительность для работы Spark, используя метод bucketing . Я читаю файлы .parquet и .csv и выполняю некоторые преобразования. После того, как я делаю группировку и присоединяюсь к двум DataFrames. Затем я пишу DF в паркет, но у меня есть пустой файл ~500B вместо 500Mb.

  • Cloudera (cdh5.15.1)
  • Spark 2.3.0
  • Blob

    val readParquet = spark.read.parquet(inputP)
    readParquet
        .write
        .format("parquet")
        .bucketBy(23, "column")
        .sortBy("column")
        .mode(SaveMode.Overwrite)
        .saveAsTable("bucketedTable1")
    
    val firstTableDF = spark.table("bucketedTable1")
    
    val readCSV = spark.read.csv(inputCSV)
    readCSV
        .filter(..)
        .ordrerBy(someColumn)
    
        .write
        .format("parquet")
        .bucketBy(23, "column")
        .sortBy("column")
        .mode(SaveMode.Overwrite)
        .saveAsTable("bucketedTable2")
    
    val secondTableDF = spark.table("bucketedTable2")
    
    val resultDF = secondTableDF
        .join(firstTableDF, Seq("column"), "fullouter")
        .
        .
    resultDF
        .coalesce(1)
        .write
        .mode(SaveMode.Overwrite)
        .parquet(output)
    

Когда я запускаю задание Spark в командной строке, используя ssh, я получаю правильный результат, ~500Mb файл паркета, который я вижу, используя Hive. Если я запускаю ту же работу, используя oozie workflow, у меня появляется пустой файл (~500 Bytes). Когда я делаю .show() на своем resultDF, я вижу данные, но у меня есть пустой файл паркета.

+-----------+---------------+----------+
|       col1|          col2 |      col3|
+-----------+---------------+----------+
|33601234567|208012345678910|       LOL|
|33601234567|208012345678910|       LOL|
|33601234567|208012345678910|       LOL|

Нет проблем при записи в паркет, если я не сохраняю данные в виде таблицы. Это происходит только с DF, созданным из таблицы.

Есть предложения?

Заранее спасибо за любые мысли!

1 Ответ

1 голос
/ 25 июня 2019

Я понял это для своего случая, я просто добавил опцию .option("path", "/sources/tmp_files_path").Теперь я могу использовать группирование, и у меня есть данные в моих выходных файлах.

readParquet
  .write
  .option("path", "/sources/tmp_files_path")
  .mode(SaveMode.Overwrite)
  .bucketBy(23, "column")
  .sortBy("column")
  .saveAsTable("bucketedTable1")
...