Я пытаюсь оптимизировать производительность для работы Spark, используя метод bucketing . Я читаю файлы .parquet
и .csv
и выполняю некоторые преобразования. После того, как я делаю группировку и присоединяюсь к двум DataFrames. Затем я пишу DF в паркет, но у меня есть пустой файл ~500B
вместо 500Mb
.
- Cloudera (cdh5.15.1)
- Spark 2.3.0
Blob
val readParquet = spark.read.parquet(inputP)
readParquet
.write
.format("parquet")
.bucketBy(23, "column")
.sortBy("column")
.mode(SaveMode.Overwrite)
.saveAsTable("bucketedTable1")
val firstTableDF = spark.table("bucketedTable1")
val readCSV = spark.read.csv(inputCSV)
readCSV
.filter(..)
.ordrerBy(someColumn)
.write
.format("parquet")
.bucketBy(23, "column")
.sortBy("column")
.mode(SaveMode.Overwrite)
.saveAsTable("bucketedTable2")
val secondTableDF = spark.table("bucketedTable2")
val resultDF = secondTableDF
.join(firstTableDF, Seq("column"), "fullouter")
.
.
resultDF
.coalesce(1)
.write
.mode(SaveMode.Overwrite)
.parquet(output)
Когда я запускаю задание Spark в командной строке, используя ssh
, я получаю правильный результат, ~500Mb
файл паркета, который я вижу, используя Hive. Если я запускаю ту же работу, используя oozie workflow, у меня появляется пустой файл (~500 Bytes
).
Когда я делаю .show()
на своем resultDF
, я вижу данные, но у меня есть пустой файл паркета.
+-----------+---------------+----------+
| col1| col2 | col3|
+-----------+---------------+----------+
|33601234567|208012345678910| LOL|
|33601234567|208012345678910| LOL|
|33601234567|208012345678910| LOL|
Нет проблем при записи в паркет, если я не сохраняю данные в виде таблицы. Это происходит только с DF, созданным из таблицы.
Есть предложения?
Заранее спасибо за любые мысли!