Я создал образец документа на основе ваших данных в версии ES 6.4 / Spark 2.1 и использовал приведенный ниже код, чтобы прочитать поле GenerateTime
как text
вместо датывведите искру.
Отображение в ES
PUT somedateindex
{
"mappings": {
"mydocs":{
"properties": {
"GenerateTime": {
"type": "date",
"format": "yyyy/MM/dd HH:mm:ss"
}
}
}
}
}
Обратите внимание, что поле имеет тип date
в ES.
Spark Code для использования поля даты в ES в качестве строки
Обратите внимание, что я использовал опцию config ("es.mapping.date.rich" , false )
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.master", "local")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
val df = spark.read.format("org.elasticsearch.spark.sql")
.option("es.resource.read","somedateindex")
.option("es.nodes", "some_host_name")
.option("es.mapping.date.rich", false)
.option("es.port","9200")
.load()
df.show()
df.printSchema()
Результат кода искры на моей консоли Eclipse:
19/05/13 03:10:53 INFO DAGScheduler: Job 1 finished: show at Elasticsearch.scala:134, took 9.424294 s
19/05/13 03:10:53 INFO CodeGenerator: Code generated in 21.256205 ms
+-------------------+
| GenerateTime|
+-------------------+
|2019/05/06 19:31:21|
+-------------------+
root
|-- GenerateTime: string (nullable = true)
19/05/13 03:10:53 INFO SparkUI: Stopped Spark web UI at....
Обратите внимание, что printSchema
показывает, что в таблице есть один столбец GenerateTime
, которыйимеет тип string
.
Если вы не хотите идти дальше и менять сопоставления, вышеприведенное должно вам помочь.
Я рекомендую иметь поля даты в формате даты, а не в тексте, и это тоже в формате, поддерживаемом ISO-8601, чтобы при выводе типа в итоге вы получали данные в правильном типе в Spark и моглипросто сосредоточьтесь на бизнес-логике, часто правильное решение заключается в том, как мы храним данные, а не в том, как мы их обрабатываем.
Spark-код для преобразования String в Timestamp / Date
Однако, если по какой-то причине вы не можете изменить сопоставления из источника, то естьasticsearch, вы можете дополнительно добавить приведенный ниже код для преобразованиястроковое значение в метку времени, используя приведенный ниже код:
import org.apache.spark.sql.functions._
//String into Timestamp Transformation
val df2_timestamp = df.withColumn("GenerateTime_timestamp", from_unixtime(unix_timestamp($"GenerateTime", "yyyy/MM/dd HH:mm:ss")).cast(TimestampType))
df2_timestamp.show(false)
df2_timestamp.printSchema();
Если вы запустите приведенный выше код, вы увидите вывод, показанный ниже:
19/05/14 11:33:10 INFO CodeGenerator: Code generated in 23.742359 ms
+-------------------+----------------------+
|GenerateTime |GenerateTime_timestamp|
+-------------------+----------------------+
|2019/05/06 19:31:21|2019-05-06 19:31:21.0 |
+-------------------+----------------------+
root
|-- GenerateTime: string (nullable = true)
|-- GenerateTime_timestamp: timestamp (nullable = true)
19/05/14 11:33:10 INFO SparkContext: Invoking stop() from shutdown hook
Также обратите внимание, что мое решениев Скале.Дайте мне знать, если это поможет!