Мы читаем информацию о метках времени из файлов avro в нашем приложении.Я нахожусь в процессе тестирования обновления от Spark 2.3.1 до Spark 2.4, которое включает в себя недавно встроенную интеграцию spark-avro.Однако я не могу понять, как сказать схеме avro, что я хочу, чтобы метки времени имели логический тип «timestamp-millis», а не «timestamp-micros» по умолчанию.
Просто взглянув на тестовые файлы avroВ Spark 2.3.1 с использованием пакета Databricks spark-avro 4.0.0 у нас были следующие поля / схема:
{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":["long","null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}
Время поиска составляло миллисекунды с момента, когда эпоха сохранялась как long.Все было хорошо.
Когда я поднял вещи до Spark 2.4 и встроенных пакетов spark-avro 2.4.0, у меня появились следующие новые поля / схемы:
{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}
Как одинМожно видеть, что базовый тип все еще длинный, но теперь он дополнен логическим типом «timestamp-micros».Это именно то, что, как говорят в заметках о выпуске произойдет, однако я не могу найти способ указать схему для использования опции 'timestamp-millis'.
Это становится проблемой, когда я записываю в файл avro объект Timestamp, инициализированный, скажем, через 10000 секунд после эпохи, он будет считан обратно как 10000000 секунд.В соответствии с 2.3.1 / databricks-avro, это был просто длинный, без информации, связанной с ним, поэтому он вышел так же, как и вошел.
В настоящее время мы строим схему, отражая объект интереса.следующим образом:
val searchSchema: StructType = ScalaReflection.schemaFor[searchEntry].dataType.asInstanceOf[StructType]
Я попытался расширить это, создав модифицированную схему, которая пыталась заменить StructField, соответствующий записи searchTime, следующим образом:
val modSearchSchema = StructType(searchSchema.fields.map {
case StructField(name, _, nullable, metadata) if name == "searchTime" =>
StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
case f => f
})
Однако объект StructField определенв spark.sql.types нет понятия логического типа, который может увеличить в нем dataType.
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean = true,
metadata: Metadata = Metadata.empty)
Я также пытался создать схему из представления JSON двумя способами:
val schemaJSONrepr = """{
| "name" : "id",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "searchQuery",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "searchTime",
| "type" : "long",
| "logicalType" : "timestamp-millis",
| "nullable" : false,
| "metadata" : { }
| }, {
| "name" : "score",
| "type" : "double",
| "nullable" : false,
| "metadata" : { }
| }, {
| "name" : "searchType",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }""".stripMargin
Первая попытка состояла в том, чтобы просто создать DataType из этого
// here spark is a SparkSession instance from a higher scope.
val schema = DataType.fromJSON(schemaJSONrepr).asInstanceOf[StructType]
spark.read
.schema(schema)
.format("avro")
.option("basePath", baseUri)
.load(uris: _*)
Это не удалось, поскольку он не смог создать StructType для узла searchTime, поскольку в нем есть «logicType».Вторая попытка состояла в том, чтобы просто создать схему, передав необработанную строку JSON.
spark.read
.schema(schemaJSONrepr)
.format("avro")
.option("basePath", baseUri)
.load(uris: _*)
Не удалось сказать, что:
mismatched input '{' expecting {'SELECT', 'FROM', ...
== SQL ==
{
^^^
Я обнаружил, что в искре -avro API есть способ ПОЛУЧИТЬ логический тип из схемы, но он не может понять, как его установить.
Как вы можете видеть из моих неудачных попыток выше, я попытался использовать Schema.Parser для создания объекта схемы avro, но единственным допустимым типом в spark.read.schema являются String и StructType.
Если кто-нибудь может дать представление о том, как изменить / указать этот логический тип, я был бы очень признателен.Спасибо