Как установить логический тип в схеме spark-avro 2.4? - PullRequest
0 голосов
/ 06 февраля 2019

Мы читаем информацию о метках времени из файлов avro в нашем приложении.Я нахожусь в процессе тестирования обновления от Spark 2.3.1 до Spark 2.4, которое включает в себя недавно встроенную интеграцию spark-avro.Однако я не могу понять, как сказать схеме avro, что я хочу, чтобы метки времени имели логический тип «timestamp-millis», а не «timestamp-micros» по умолчанию.

Просто взглянув на тестовые файлы avroВ Spark 2.3.1 с использованием пакета Databricks spark-avro 4.0.0 у нас были следующие поля / схема:

{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":["long","null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}

Время поиска составляло миллисекунды с момента, когда эпоха сохранялась как long.Все было хорошо.

Когда я поднял вещи до Spark 2.4 и встроенных пакетов spark-avro 2.4.0, у меня появились следующие новые поля / схемы:

{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}

Как одинМожно видеть, что базовый тип все еще длинный, но теперь он дополнен логическим типом «timestamp-micros».Это именно то, что, как говорят в заметках о выпуске произойдет, однако я не могу найти способ указать схему для использования опции 'timestamp-millis'.

Это становится проблемой, когда я записываю в файл avro объект Timestamp, инициализированный, скажем, через 10000 секунд после эпохи, он будет считан обратно как 10000000 секунд.В соответствии с 2.3.1 / databricks-avro, это был просто длинный, без информации, связанной с ним, поэтому он вышел так же, как и вошел.

В настоящее время мы строим схему, отражая объект интереса.следующим образом:

val searchSchema: StructType = ScalaReflection.schemaFor[searchEntry].dataType.asInstanceOf[StructType]

Я попытался расширить это, создав модифицированную схему, которая пыталась заменить StructField, соответствующий записи searchTime, следующим образом:

    val modSearchSchema = StructType(searchSchema.fields.map {
      case StructField(name, _, nullable, metadata) if name == "searchTime" =>
        StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
      case f => f
    })

Однако объект StructField определенв spark.sql.types нет понятия логического типа, который может увеличить в нем dataType.

case class StructField(
    name: String,
    dataType: DataType,
    nullable: Boolean = true,
    metadata: Metadata = Metadata.empty) 

Я также пытался создать схему из представления JSON двумя способами:

val schemaJSONrepr = """{
          |          "name" : "id",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchQuery",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchTime",
          |          "type" : "long",
          |          "logicalType" : "timestamp-millis",
          |          "nullable" : false,
          |          "metadata" : { }
          |        }, {
          |          "name" : "score",
          |          "type" : "double",
          |          "nullable" : false,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchType",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }""".stripMargin

Первая попытка состояла в том, чтобы просто создать DataType из этого

// here spark is a SparkSession instance from a higher scope.
val schema = DataType.fromJSON(schemaJSONrepr).asInstanceOf[StructType]
spark.read
     .schema(schema)
     .format("avro")
     .option("basePath", baseUri)
     .load(uris: _*)

Это не удалось, поскольку он не смог создать StructType для узла searchTime, поскольку в нем есть «logicType».Вторая попытка состояла в том, чтобы просто создать схему, передав необработанную строку JSON.

spark.read
     .schema(schemaJSONrepr)
     .format("avro")
     .option("basePath", baseUri)
     .load(uris: _*)

Не удалось сказать, что:

mismatched input '{' expecting {'SELECT', 'FROM', ...

== SQL ==

{
^^^

Я обнаружил, что в искре -avro API есть способ ПОЛУЧИТЬ логический тип из схемы, но он не может понять, как его установить.

Как вы можете видеть из моих неудачных попыток выше, я попытался использовать Schema.Parser для создания объекта схемы avro, но единственным допустимым типом в spark.read.schema являются String и StructType.

Если кто-нибудь может дать представление о том, как изменить / указать этот логический тип, я был бы очень признателен.Спасибо

1 Ответ

0 голосов
/ 06 февраля 2019

Хорошо, я думаю, что ответил на свой вопрос.Когда я модифицировал программно построенную схему, чтобы использовать явный тип метки времени

val modSearchSchema = StructType(searchSchema.fields.map {
      case StructField(name, _, nullable, metadata) if name == "searchTime" =>
        StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
      case f => f
    })

, я не изменил логику, когда мы выполняли чтение, когда у нас был объект Row, из которого мы читали обратно.Первоначально мы читали Long и конвертировали его в Timestamp, где все пошло не так, поскольку он считывал Long в микросекундах, что делало его в 1000 раз больше, чем мы предполагали.Изменение нашего чтения для чтения объекта Timestamp напрямую позволяет основной логике учесть это, забирая его из наших (моих) рук.Итак:

// searchTime = new Timestamp(row.getAs[Long]("searchTime")) BROKEN

searchTime = row.getAs[Timestamp]("searchTime") // SUCCESS
...