Невозможно запросить Glue Table из Афины после обновления разделов в Glue Job - PullRequest
2 голосов
/ 20 апреля 2020

У нас странная проблема с клеем / Афиной. Мы создали таблицу Glue в Cloud Formation без предопределенной схемы, чтобы использовать преимущества Dynami c Frame:

OurGlueTable:
Type: AWS::Glue::Table
Properties:
  DatabaseName: !Ref OurGlueDatabase
  CatalogId: !Ref AWS::AccountId
  TableInput:
    Name: "device"
    Description: "Device table"
    TableType: EXTERNAL_TABLE
    Parameters: {"classification": "glueparquet"}
    StorageDescriptor:
      Compressed: false
      InputFormat: "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
      OutputFormat: "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
      SerdeInfo:
        SerializationLibrary: "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
        Parameters: {serialization.format: 1}
      Location: !Ref ExternalTableDataS3Location
      StoredAsSubDirectories: false

И мы сохраняем данные в эту таблицу в scala spark Glue (версия 1.0) ETL:

  override def saveData(transformedDataFrame: DataFrame): Unit = {
    val frameWithDateColumns: DataFrame = addMissingColumns(transformedDataFrame)
    val dynamicFrame = DynamicFrame.apply(frameWithDateColumns, glueContext)
    val sink: DataSink = getSink
    sink.writeDynamicFrame(dynamicFrame)
  }
  private def addMissingColumns(transformedDataFrame: DataFrame): DataFrame = {
    import org.apache.spark.sql.functions._
    transformedDataFrame
      .withColumn("year", lit(processingYear))
      .withColumn("month", lit(processingMonth))
      .withColumn("day", lit(processingDay))
  }
  private def getSink = {
    val database = config.getString("database")
    val table = config.getString("table")
    val options: JsonOptions = JsonOptions(Map("partitionKeys" -> Seq("year", "month", "day"), "enableUpdateCatalog" -> true))
    val format = "glueparquet"
    val sink = glueContext.getCatalogSink(database = database, tableName = table, additionalOptions = options)
    sink.setFormat(format, JsonOptions(Map()))
    sink
  }

После первого задания происходит обновление схемы склеивания таблицы и раздела, и мы можем читать данные в Афине. После второго задания с теми же входными данными и другим параметром дня (который создаст другой раздел) схема таблицы склеивания остается прежней, и мы можем видеть новый раздел в консоли таблицы склеивания (что правильно), но на этот раз при чтении данных в Афине есть:

HIVE_METASTORE_ERROR: com.facebook.presto.spi.PrestoException: ошибка: тип ожидается в позиции 0 из «long», но «long» найден. (Служба: null; Код состояния: 0; Код ошибки: null; Идентификатор запроса: null)

, когда мы пытаемся обновить sh разделов с помощью MSCK REPAIR TABLE, возникает другая ошибка

FAILED: Ошибка выполнения, код возврата 1 из org. apache .had oop .hive.ql.exe c .DDLTask. Имя равно null

Что важно, если мы выполняем задания в течение двух дней параллельно, мы можем читать данные в Афине, эта проблема возникает только при наличии одного задания за другим.

Мы уже пытались изменить определение таблицы в Cloud Formation для создания таблицы с предопределенными разделами (год, месяц, день) и изменили StoredAsSubDirectories на true, но это не сработало.

Хотя При реализации нашего кода мы следовали статье https://docs.aws.amazon.com/glue/latest/dg/update-from-job.html.

У кого-нибудь была подобная проблема?

...