Avro написать ошибку преобразования java.sql.Timestamp - PullRequest
1 голос
/ 01 апреля 2019

Мне нужно записать метку времени в раздел Kafka, а затем прочитать ее с нее. Для этого я определил схему Avro:

{ "namespace":"sample",
  "type":"record",
  "name":"TestData",
  "fields":[
    {"name": "update_database_time", "type": "long", "logicalType": "timestamp-millis"}
  ]
}

Тем не менее, я получаю ошибку преобразования в строке provider.send:

java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long

Как я могу это исправить?

Вот код для записи метки времени в Kafka:

    val tmstpOffset = testDataDF
      .select("update_database_time")
      .orderBy(desc("update_database_time"))
      .head()
      .getTimestamp(0)

    val avroRecord = new GenericData.Record(parseAvroSchemaFromFile("/avro-offset-schema.json"))
    avroRecord.put("update_database_time", tmstpOffset)

    val producer = new KafkaProducer[String, GenericRecord](kafkaParams().asJava)
    val data = new ProducerRecord[String, GenericRecord]("app_state_test7", avroRecord)
    producer.send(data)

1 Ответ

1 голос
/ 01 апреля 2019

Avro не поддерживает время для отметки времени напрямую, но логически долго.Таким образом, вы можете конвертировать его в long и использовать его, как показано ниже.Функция unix_timestamp () используется для преобразования, но если у вас есть определенный формат даты, используйте перегруженную функцию unix_timestamp (col, dataformat).

import org.apache.spark.sql.functions._
val tmstpOffset = testDataDF
      .select((unix_timestamp("update_database_time")*1000).as("update_database_time"))
      .orderBy(desc("update_database_time"))
      .head()
      .getTimestamp(0)
...