В Scala я записываю свой DataFrame в S3, используя .saveTableAs
, но похоже, что Glue не обновляется должным образом относительно местоположения, формата и т. Д. c базы данных. Для фона входящий набор данных составляет 1,5 ТБ в JSON, а форматом данных назначения является Parquet; записываются все файлы Parquet, хотя процесс переименования происходит довольно медленно.
val writeMode = "Overwrite"
val destinationFormatType = "parquet"
val s3PathBase = "s3://foo_bucket"
val currentDatabase = "bar"
val replaceTable = true
val jsonColumn = "json"
val partitionBy = Array("year", "month", "day", "hour")
val currentEvent = "fooBar"
val tableLowerCase = glueCatalog.fixTableName(currentEvent.asInstanceOf[String])
val s3Path = s"${s3PathBase}/${tableLowerCase}"
val tablePathInDb = s"${currentDatabase}.${tableLowerCase}"
println(tablePathInDb)
val currentEventDf = spark.read.json(
dfWithJson
.filter(col("event") === lit(currentEvent))
.select(jsonColumn)
.as[String]
)
// Adds partitions to have input data retain the same paths as the output data, since this is Kinesis
val dfToWrite = s3Reader.addKinesisPartitionsToDataFrameRows(currentEventDf, inputBasePath)
val dfWriter = dfToWrite
.repartition(partitionBy.map(col): _*)
.write
.option("mode", "DROPMALFORMED")
.mode(writeMode)
.format(destinationFormatType)
.option(
"path",
s3Path
)
if (replaceTable) {
println("\t- .saveAsTable")
dfWriter
.partitionBy(partitionBy: _*)
.saveAsTable(tablePathInDb)
} else {
println("\t- .insertInto")
dfWriter.insertInto(tablePathInDb)
}
Когда данные записываются, они отображаются правильно и читаются в S3 через Spark, но Glue неправильно регистрирует таблицу Hive:
Имя foobar
Описание
Панель базы данных
Классификация Неизвестна
Местоположение s3: // foo_bucket / hive-metastore / bar. db / foobar- PLACEHOLDER
Соединение
Устаревшее №
Последнее обновление Четверг 09 09 16:55:23 GMT-800 2020
Формат ввода org. apache .had oop .mapred.SequenceFileInputFormat
Формат вывода org. apache .had oop .hive.ql.io.HiveSequenceFileOutputFormat
Сериализация Serde lib org. apache .had oop .hive.serde2.lazy.LazySimpleSerDe
Параметры Serde
режим DROPMALFORMED
путь s3: // foo_bucket / foobar
serialization.format 1