Мне нужен способ записи следующей метки времени в Elasticsearch без повторного вызова сообщения об ошибке. Код ниже читает в файле JSON, а затем пишет в Elasticsearch.
Мой код:
import org.apache.spark.sql.types._
val schemaDF = spark.read.json("/tmp/LTPD/schema.json")
schemaDF.printSchema()
val schema = schemaDF.schema
//read from JSON file
val streamingDF = spark
.readStream
.schema(schema)
.json("/tmp/Directory/")
streamingDF
.writeStream
.outputMode("append")
.format("org.elasticsearch.spark.sql")
.trigger(Trigger.ProcessingTime(conf.getString("spark.trigger")))
.start("indexname/ourdoctype").awaitTermination()
Код работает для нулей в поле отметки времени, но жалуется, когда json содержит строку с 2019-08-15T09:40:13+00:00
или 2020-03-02T15:13:26Z
.
Образец Json
{
"name":"Jordan",
"date": "2019-06-01T00:00:00+00:00",
"gmt": "2020-03-02T15:13:26Z",
"skills":["Scala", "Spark", "Akka"]
}
Я вижу исключение:
failed to parse field [metaData.collectionDateUtc] of type [long] in
document with id org.elasticsearch.hadoop.rest.EsHadoopRemoteException:
illegal_argument_exception: For input string: "2019-08-15T09:40:13+00:00"