Как сохранить geo_point в Elasticsearch из PySpark? - PullRequest
0 голосов
/ 06 сентября 2018

Я хочу создать DataFrame в PySpark и сохранить его в Elasticsearch:

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row

spark = SparkSession \
    .builder \
    .appName("Test") \
    .master("local[4]") \
    .config("es.nodes","localhost") \
    .config("es.port",9200) \
    .config("es.nodes.wan.only","true") \
    .getOrCreate()

schema = StructType([StructField('id', StringType()), \
                     StructField('timestamp',LongType()), \
                     StructField('coordinates',ArrayType(DoubleType())])
rows = [Row(id="11", timestamp=1523975430000, coordinates = [41.5555, 2.1522])]

df = spark.createDataFrame(rows, schema)

df.write \
        .format("org.elasticsearch.spark.sql") \
        .mode('append') \
        .option("es.resource", "myindex/intensity") \
        .save()

Это индекс myindex с его отображением intensity в Elasticsearch, где я хочу сохранить данные:

{
    "mappings": {
        "intensity": {

        "properties": {
          "id": {
            "type":"keyword"
          },
          "timestamp": {
            "type":"date"
          },
          "coordinates": {
            "type":"geo_point"
          }
        }
      }
    }
}

Проблема возникает с geo_point. Это должно быть сохранено следующим образом:

"coordinates": {
  "lat": 41.5555,
  "lon": 2.1522
}

Но в моем случае это сохраняется следующим образом:

"coordinates": [
  41.5555,
  2.1522
]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...