Spark .saveTableAs на AWS EMR для записи в каталог клея не удается - PullRequest
0 голосов
/ 10 января 2020

В Scala я записываю свой DataFrame в S3, используя .saveTableAs, но похоже, что Glue не обновляется должным образом относительно местоположения, формата и т. Д. c базы данных. Для фона входящий набор данных составляет 1,5 ТБ в JSON, а форматом данных назначения является Parquet; записываются все файлы Parquet, хотя процесс переименования происходит довольно медленно.

val writeMode = "Overwrite"
val destinationFormatType = "parquet"
val s3PathBase = "s3://foo_bucket"
val currentDatabase = "bar"
val replaceTable = true
val jsonColumn = "json"
val partitionBy = Array("year", "month", "day", "hour")

val currentEvent = "fooBar"
val tableLowerCase = glueCatalog.fixTableName(currentEvent.asInstanceOf[String])
val s3Path = s"${s3PathBase}/${tableLowerCase}"
val tablePathInDb = s"${currentDatabase}.${tableLowerCase}"
println(tablePathInDb)

val currentEventDf = spark.read.json(
  dfWithJson
    .filter(col("event") === lit(currentEvent))
    .select(jsonColumn)
    .as[String]
)
// Adds partitions to have input data retain the same paths as the output data, since this is Kinesis
val dfToWrite = s3Reader.addKinesisPartitionsToDataFrameRows(currentEventDf, inputBasePath)

val dfWriter = dfToWrite
  .repartition(partitionBy.map(col): _*)
  .write
  .option("mode", "DROPMALFORMED")
  .mode(writeMode)
  .format(destinationFormatType)
  .option(
    "path",
    s3Path
  )
if (replaceTable) {
  println("\t- .saveAsTable")
  dfWriter
    .partitionBy(partitionBy: _*)
    .saveAsTable(tablePathInDb)
} else {
  println("\t- .insertInto")
  dfWriter.insertInto(tablePathInDb)
}

Когда данные записываются, они отображаются правильно и читаются в S3 через Spark, но Glue неправильно регистрирует таблицу Hive:

Имя foobar

Описание

Панель базы данных

Классификация Неизвестна

Местоположение s3: // foo_bucket / hive-metastore / bar. db / foobar- PLACEHOLDER

Соединение

Устаревшее №

Последнее обновление Четверг 09 09 16:55:23 GMT-800 2020

Формат ввода org. apache .had oop .mapred.SequenceFileInputFormat

Формат вывода org. apache .had oop .hive.ql.io.HiveSequenceFileOutputFormat

Сериализация Serde lib org. apache .had oop .hive.serde2.lazy.LazySimpleSerDe

Параметры Serde

режим DROPMALFORMED

путь s3: // foo_bucket / foobar

serialization.format 1

1 Ответ

0 голосов
/ 13 января 2020

Чтобы обойти эту проблему, неправильно склейте данные в файл последовательности:

val destinationFormatType = "hive"

И дополнительно добавьте следующее к dfWriter:

  .option(
    "fileFormat",
    destinationFormatType
  )
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...